---
title: Introducing Airflow 2.8
description: >-
  The latest minor Airflow release includes new features and improvements such
  as the Airflow ObjectStore, Listener hook for Datasets, enhanced logging
  capabilities, and more.
date: 2023-12-18T00:00:00.000Z
authors:
  - author: src/content/people/kenten-danas.md
tags: []
categories: []
canonical_url: 'https://www.astronomer.io/blog/introducing-airflow-2-8/'
---
<iframe src="https://player.mux.com/eWbz5KTSukm9bSLYs3Gyd2HwAzu00SuHSMa4AhnLii4c" class="acds-mux-video" title="Video player" allow="accelerometer; gyroscope; autoplay; encrypted-media; picture-in-picture;" allowfullscreen></iframe>

The Apache Airflow® project is as strong as ever, with a vast community of contributors continually adding and improving features at a high velocity. Airflow 2.8 was released this week, continuing the streak of minor releases that move the project forward every couple of months. This release contains more than 20 new features, over 60 improvements, and over 50 bug fixes.

Many of the updates in 2.8 center around broadening the DAG authoring experience, enhancing [logging](/docs/learn/logging), and addressing other features related to day-to-day operations, while also laying the groundwork for future improvements.

This blog post will walk you through everything you need to know about this release, so you don’t miss any of the exciting additions and changes.

## Airflow ObjectStore

One of the biggest updates in Airflow 2.8 is the implementation of [AIP 58](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263430565), the Airflow ObjectStore. With this update, Airflow offers an abstraction layer over common object stores like LocalFilesystem, S3, GCS, and Azure Blob Storage. What this means in practice is you can interact with files stored in any of these systems with the same code, rather than relying on multiple specific operators. This is especially useful when transferring files, so you can avoid using many different XtoYTransferOperator type-operators, which can be difficult to maintain (especially if you have to add your own custom ones).

For example, the following code shows how to use the new `ObjectStoragePath` object in two functions to list files in an S3 bucket and copy them to a GCS bucket.

```python
base_src = ObjectStoragePath("s3://my_bucket", conn_id=”aws_default”)
base_dst = ObjectStoragePath("gcs://my_bucket", conn_id=”google_default”)

# ...

@task
def list_files(base_path: ObjectStoragePath) -> list[ObjectStoragePath]:
    """List files in remote object storage."""
    path = base_path / "my_folder"
    files = [f for f in path.iterdir() if f.is_file()]
    return files

@task
def copy_file(dst: ObjectStoragePath, file: ObjectStoragePath):
    """Copy a file from remote to another object storage.
    The file is streamed in chunks using shutil.copyobj"""

    file.copy(dst=dst)

files_s3 = list_files(base_path=base_src)
copy_file.partial(dst=base_dst).expand(file=files_s3)
```

This example only scratches the surface of what you can do with `ObjectStoragePath`.

Note that this feature is experimental and subject to change. It also opens the door for development of additional features as described in the AIP, like a standardized way for XCom to manage larger data volumes, and allowing DAG processing to occur from an arbitrary location, simplifying CICD. This is definitely one to watch for future updates.

## Listener hook for datasets

Airflow listeners are an experimental feature that allows you to subscribe to certain events in your Airflow environment. You can think of them as kind of like a callback function, but for your whole Airflow instance, rather than a specific DAG or task.

In Airflow 2.8, a listener hook for datasets was added, which allows you to subscribe to dataset creation and update events. This is particularly useful if you want to send a notification when multiple datasets have been updated, or trigger an external process based on dataset events.

To implement, you need to write your function using the listener hook you want (either `on_dataset_changed` or `on_dataset_updated`):

```python
# include/listener/dataset_listener.py

from airflow import Dataset
from airflow.listeners import hookimpl
from airflow.providers.slack.notifications.slack import send_slack_notification


@hookimpl
def on_dataset_changed(dataset: Dataset):
    """Execute when dataset change is registered."""
    print("I am always listening for any Dataset changes and I heard that!")
    print("Posting to Slack...")
    send_slack_notification(
        slack_conn_id="my_slack_conn",
        text=f"Dataset {dataset.uri} was changed!",
        channel="#alerts"
    )
    print("Done!")
    if dataset.uri == "s3://my-bucket/my-dataset.csv":
        print("This is a dataset I want a specific notification for!")
        print("Let's do something else...")
        print("Done!")

```

The listener function will run anytime the criteria is met (`on_dataset_changed` in this case), but you can add additional logic to execute part of the function for only specific datasets.

Then you add it to your Airflow instance using a plugin:

```python
from include.listener import dataset_listener

class AirflowPlugin:
  # A list of Listeners that plugin provides. Listeners can register to
  # listen to particular events that happen in Airflow, like
  # TaskInstance state changes. Listeners are python modules.
  listeners = [dataset_listener]
```

## Enhanced logging capabilities

If you have ever dealt with an Airflow task failing because of something external to the task - maybe your worker ran out of memory, or your task got stuck in queued and timed out - you know it can be difficult to debug in these scenarios. Previously, you would have to go to the component logs for your Scheduler, Executor, etc. to try and figure out what happened.

Now, in Airflow 2.8, there is a new `TaskContextLogger` which forwards component logs to the task logs. So if your task has an issue unrelated to the task itself, relevant logs will show up in the task logs, and you won’t have to leave the Airflow UI to figure out the cause of failure.

![image4](/images/posts/2023/introducing-airflow-2-8/image4.png)

This feature is turned on by default, and can be controlled with `enable_task_context_logger` in the logging config.

## UI updates

Nearly every Airflow release brings great UI updates that improve the experience of working with Airflow, and Airflow 2.8 is no different.

Some updates make the UI easier to navigate, consolidating actions into views where users spend the most time. For instance, an XCom tab was added to the Grid view, so you no longer have to go to a different view to see the XComs generated by your tasks.

![image3](/images/posts/2023/introducing-airflow-2-8/image3.png)

You also now have the ability to clear downstream task instances when selecting tasks in the List Task Instances view.

![image2](/images/posts/2023/introducing-airflow-2-8/image2.png)

Other UI updates improve the usability of key features, like recent updates to the Trigger DAG run UI. Now, when manually triggering a DAG, the `Run ID` and `Logical date` fields are hidden.

![image1](/images/posts/2023/introducing-airflow-2-8/image1.png)

These fields used to be at the top of the form even though they rarely need to be changed, which often caused confusion for new users. The fields are still there if you need to update them - in the `Generated JSON Configuration and Dagrun Options` section - but are no longer in the fore view for users looking to quickly trigger a manual run.

## New operators and decorators

Several new branching operators and decorators were created for 2.8:

- BranchPythonVirtualenvOperator
- @task.branch_external_python
- @task.branch_virtualenv

All three of these run branching tasks in virtual environments, either created at runtime for that task (virtualenv) or preexisting in your Airflow environment (external_python). [Branching](/docs/learn/airflow-branch-operator) is a common way of inserting conditional logic into your DAGs, and these operators and decorators provide additional options for running that conditional logic in isolated Python environments.

## Other awesome updates

There are lots more notable updates in 2.8 that you should be aware of, including:

- For security, raw HTML code in DAG docs and DAG param descriptions is now disabled by default.
- There is a new `dag_run.clear_number` attribute that tracks how many times a DAG run has been cleared, which is used to improve Airflow reliability metrics and is useful for monitoring.
- You can now add a description to your pools, which is shown in the UI.
- There is a new `prev_end_date_success` method in the context to provide the end date of the last successful task instance.
- DAGs with `None` as their schedule no longer need to have an assigned `start_date`.

And even these updates barely scratch the surface. Regardless of how you use Airflow, there’s something for you in 2.8.

## Learn more and get started

Airflow 2.8 has way more features and improvements than we can cover in a single blog post. To learn more, check out the full [release notes](https://airflow.apache.org/docs/apache-airflow/2.8.0/release\_notes.html), and join us for a [webinar](/events/webinars/whats-new-in-airflow-2-8-video/) on January 11th that will cover the new release in more detail.

To try Airflow 2.8 and see all the great features for yourself, get started with a free [14-day trial of Astro](/lp/signup/). We offer same-day support for all new Airflow releases, so you don’t have to wait to take advantage of the latest and greatest features.
