Introducing Airflow 2.8

  • Kenten Danas

The Apache Airflow project is as strong as ever, with a vast community of contributors continually adding and improving features at a high velocity. Airflow 2.8 was released this week, continuing the streak of minor releases that move the project forward every couple of months. This release contains more than 20 new features, over 60 improvements, and over 50 bug fixes.

Many of the updates in 2.8 center around broadening the DAG authoring experience, enhancing logging, and addressing other features related to day-to-day operations, while also laying the groundwork for future improvements.

This blog post will walk you through everything you need to know about this release, so you don’t miss any of the exciting additions and changes.

Airflow ObjectStore

One of the biggest updates in Airflow 2.8 is the implementation of AIP 58, the Airflow ObjectStore. With this update, Airflow offers an abstraction layer over common object stores like LocalFilesystem, S3, GCS, and Azure Blob Storage. What this means in practice is you can interact with files stored in any of these systems with the same code, rather than relying on multiple specific operators. This is especially useful when transferring files, so you can avoid using many different XtoYTransferOperator type-operators, which can be difficult to maintain (especially if you have to add your own custom ones).

For example, the following code shows how to use the new ObjectStoragePath object in two functions to list files in an S3 bucket and copy them to a GCS bucket.

base_src = ObjectStoragePath("s3://my_bucket", conn_id=”aws_default”)
base_dst = ObjectStoragePath("gcs://my_bucket", conn_id=”google_default”)

# ...

@task
def list_files(base_path: ObjectStoragePath) -> list[ObjectStoragePath]:
    """List files in remote object storage."""
    path = base_path / "my_folder"
    files = [f for f in path.iterdir() if f.is_file()]
    return files

@task
def copy_file(dst: ObjectStoragePath, file: ObjectStoragePath):
    """Copy a file from remote to another object storage.
    The file is streamed in chunks using shutil.copyobj"""

    file.copy(dst=dst)

files_s3 = list_files(base_path=base_src)
copy_file.partial(dst=base_dst).expand(file=files_s3)

This example only scratches the surface of what you can do with ObjectStoragePath.

Note that this feature is experimental and subject to change. It also opens the door for development of additional features as described in the AIP, like a standardized way for XCom to manage larger data volumes, and allowing DAG processing to occur from an arbitrary location, simplifying CICD. This is definitely one to watch for future updates.

Listener hook for datasets

Airflow listeners are an experimental feature that allows you to subscribe to certain events in your Airflow environment. You can think of them as kind of like a callback function, but for your whole Airflow instance, rather than a specific DAG or task.

In Airflow 2.8, a listener hook for datasets was added, which allows you to subscribe to dataset creation and update events. This is particularly useful if you want to send a notification when multiple datasets have been updated, or trigger an external process based on dataset events.

To implement, you need to write your function using the listener hook you want (either on_dataset_changed or on_dataset_updated):

# include/listener/dataset_listener.py

from airflow import Dataset
from airflow.listeners import hookimpl
from airflow.providers.slack.notifications.slack import send_slack_notification


@hookimpl
def on_dataset_changed(dataset: Dataset):
    """Execute when dataset change is registered."""
    print("I am always listening for any Dataset changes and I heard that!")
    print("Posting to Slack...")
    send_slack_notification(
        slack_conn_id="my_slack_conn",
        text=f"Dataset {dataset.uri} was changed!",
        channel="#alerts"
    )
    print("Done!")
    if dataset.uri == "s3://my-bucket/my-dataset.csv":
        print("This is a dataset I want a specific notification for!")
        print("Let's do something else...")
        print("Done!")

The listener function will run anytime the criteria is met (on_dataset_changed in this case), but you can add additional logic to execute part of the function for only specific datasets.

Then you add it to your Airflow instance using a plugin:

from include.listener import dataset_listener

class AirflowPlugin:
  # A list of Listeners that plugin provides. Listeners can register to
  # listen to particular events that happen in Airflow, like
  # TaskInstance state changes. Listeners are python modules.
  listeners = [dataset_listener]

Enhanced logging capabilities

If you have ever dealt with an Airflow task failing because of something external to the task - maybe your worker ran out of memory, or your task got stuck in queued and timed out - you know it can be difficult to debug in these scenarios. Previously, you would have to go to the component logs for your Scheduler, Executor, etc. to try and figure out what happened.

Now, in Airflow 2.8, there is a new TaskContextLogger which forwards component logs to the task logs. So if your task has an issue unrelated to the task itself, relevant logs will show up in the task logs, and you won’t have to leave the Airflow UI to figure out the cause of failure.

image4

This feature is turned on by default, and can be controlled with enable_task_context_logger in the logging config.

UI updates

Nearly every Airflow release brings great UI updates that improve the experience of working with Airflow, and Airflow 2.8 is no different.

Some updates make the UI easier to navigate, consolidating actions into views where users spend the most time. For instance, an XCom tab was added to the Grid view, so you no longer have to go to a different view to see the XComs generated by your tasks.

image3

You also now have the ability to clear downstream task instances when selecting tasks in the List Task Instances view.

image2

Other UI updates improve the usability of key features, like recent updates to the Trigger DAG run UI. Now, when manually triggering a DAG, the Run ID and Logical date fields are hidden.

image1

These fields used to be at the top of the form even though they rarely need to be changed, which often caused confusion for new users. The fields are still there if you need to update them - in the Generated JSON Configuration and Dagrun Options section - but are no longer in the fore view for users looking to quickly trigger a manual run.

New operators and decorators

Several new branching operators and decorators were created for 2.8:

All three of these run branching tasks in virtual environments, either created at runtime for that task (virtualenv) or preexisting in your Airflow environment (external_python). Branching is a common way of inserting conditional logic into your DAGs, and these operators and decorators provide additional options for running that conditional logic in isolated Python environments.

Other awesome updates

There are lots more notable updates in 2.8 that you should be aware of, including:

And even these updates barely scratch the surface. Regardless of how you use Airflow, there’s something for you in 2.8.

Learn more and get started

Airflow 2.8 has way more features and improvements than we can cover in a single blog post. To learn more, check out the full release notes, and join us for a webinar on January 11th that will cover the new release in more detail.

To try Airflow 2.8 and see all the great features for yourself, get started with a free 14-day trial of Astro. We offer same-day support for all new Airflow releases, so you don’t have to wait to take advantage of the latest and greatest features.

Ready to Get Started?

Get Started Free

Try Astro free for 14 days and power your next big data project.