Datasets and data-aware scheduling in Airflow
Datasets and data-aware scheduling in Airflow
Datasets and data-aware scheduling in Airflow
With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.
Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and a machine learning team with a DAG that trains a model on the dataset. Using datasets, the machine learning team’s DAG runs only when the data engineering team’s DAG has produced an update to the dataset.
In this guide, you’ll learn about datasets in Airflow and how to use them to implement triggering of DAGs based on dataset updates.
Datasets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.
There are multiple resources for learning about this topic. See also:
To get the most out of this guide, you should have an existing knowledge of:
Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:
You can define datasets in your DAG code and use them to create cross-DAG or even cross-Deployment dependencies. This section covers definitions for dataset terminology, as well as general information on how to use them.
You can define datasets in your DAG code and use them to create cross-DAG dependencies. Airflow uses the following terms related to the datasets feature:
x- for Airflow to treat it as a string. See What is a valid URI? for detailed information.extra dictionary with additional information about the dataset or dataset event.outlets parameter, creating dataset events when it completes successfully.&) and OR (|) operators to define the schedule of a DAG scheduled on updates to several datasets.extra information to a dataset from within the producer task. This functionality can be used to pass dataset-related metadata between tasks, see Attaching information to a dataset event.Two parameters relating to Airflow datasets exist in all Airflow operators and decorators:
extra information from related dataset events. Defining inlets for a task does not affect the schedule of the DAG containing the task and the relationship is not reflected in the Airflow UI.To summarize, tasks produce updates to datasets given to their outlets parameter, and this action creates dataset events. DAGs can be scheduled based on dataset events created for one or more datasets, and tasks can be given access to all events attached to a dataset by defining the dataset as one of their inlets. A dataset is defined as an object in the Airflow metadata database as soon as it is referenced in either, the outlets parameter of a task or the schedule of a DAG.
When you work with datasets, keep the following considerations in mind:
As of Airflow 2.8, you can use listeners to enable Airflow to run any code when certain dataset events occur anywhere in your Airflow instance. There are two listener hooks for the following events:
For examples, refer to our Create Airflow listeners tutorial. Dataset Events listeners are an experimental feature.
A dataset is defined as an object in the Airflow metadata database as soon as it is referenced in either the outlets parameter of a task or the schedule of a DAG. Airflow 2.10 added the ability to create dataset aliases, see Use Dataset Aliases.
The simplest dataset schedule is one DAG scheduled based on updates to one dataset which is produced to by one task. In this example we define that the my_producer_task task in the my_producer_dag DAG produces updates to the s3://my-bucket/my-key/ dataset, creating attached dataset events, and schedule the my_consumer_dag DAG to run once for every dataset event created.
First, provide the dataset to the outlets parameter of the producer task.
You can see the relationship between the DAG containing the producing task (my_producer_dag) and the dataset in the Dependency Graph located in the Datasets tab of the Airflow UI. Note that this screenshot is using Airflow 2.10 and the UI might look different in previous versions.

In Airflow 2.9+ the graph view of the my_producer_dag shows the dataset as well.

Next, schedule the my_consumer_dag to run as soon as a new dataset event is produced to the s3://my-bucket/my-key/ dataset.
You can see the relationship between the DAG containing the producing task (my_producer_dag), the consuming DAG my_consumer_dag and the dataset in the Dependency Graph located in the Datasets tab of the Airflow UI. Note that this screenshot is using Airflow 2.10 and the UI might look different in previous versions.

In Airflow 2.9+ the graph view of the my_consumer_dag shows the dataset as well.

After unpausing the my_consumer_dag, every successful completion of the my_producer_task task triggers a run of the my_consumer_dag.

In Airflow 2.10+ the producing task will list the Dataset Events it caused in its details page, including links to the Triggered Dag Runs.

The triggered DAG run of the my_consumer_dag also lists the dataset event, including a link to the source dag from within which the dataset event was created.

In Airflow 2.10+ you have the option to create dataset aliases to schedule DAGs based on datasets with URIs generated at runtime. A dataset alias is defined by a unique name string and can be used in place of a regular dataset in outlets and schedules.
Any number of dataset events updating different datasets can be attached to a dataset alias.
There are two ways to add a dataset event to a dataset alias:
Metadata class.outlet_events pulled from the Airflow context.See the code below for examples, note how the URI of the dataset is determined at runtime inside the producing task.
In the consuming DAG you can use a dataset alias in place of a regular dataset.
Since the dataset event is generated at runtime with a dynamic URI, Airflow does not know in advance which dataset will trigger the run of the my_consumer_dag. The Airflow UI displays Unresolved DatasetAlias as the DAG schedule for DAGs that are only scheduled on aliases that have never had a dataset event attached to them.

Once the my_producer_dag containing the attach_event_to_alias_metadata task completes successfully, reparsing of all DAGs scheduled on the dataset alias my_alias is automatically triggered. This reparsing step attaches the s3://my-bucket/my-task dataset to the my_alias dataset alias and the schedule resolves, triggering one run of the my_consumer_dag.

Any further dataset event for the s3://my-bucket/my-task dataset will now trigger the my_consumer_dag. If you attach dataset events for several datasets to the same dataset alias, a DAG scheduled on that dataset alias will run as soon as any of the datasets that were ever attached to the dataset alias receive an update.
See Dynamic data events emitting and dataset creation through DatasetAlias for more information and examples of using dataset aliases.
To use Dataset Aliases with traditional operators, you need to attach the dataset event to the alias inside the operator logic. If you are using operators besides the PythonOperator, you can either do so in a custom operator’s .execute method or by passing a post_execute callable to existing operators (experimental). Use outlet_events when attaching dataset events to aliases in traditional or custom operators. Note that for deferrable operators, attaching a dataset event to an alias is only supported in the execute_complete or post_execute method.
As of Airflow 2.9+ there are three ways to update a dataset:
A task with an outlet parameter that references the dataset completes successfully.
A POST request to the datasets endpoint of the Airflow REST API.
A manual update in the Airflow UI.

When updating a dataset in the Airflow UI or making a POST request to the Airflow REST API, you can attach extra information to the dataset event by providing an extra json payload. Airflow 2.10 added the possibility to add extra information from within the producing task using either the Metadata class or accessing outlet_events from the Airflow context. You can attach any information to the extra that was computed within the task, for example information about the dataset you are working with.
To use the Metadata class to attach information to a dataset, follow the example in the code snippet below. Make sure that the dataset used in the metadata class is also defined as an outlet in the producer task.
You can also access the outlet_events from the Airflow context directly to add an extra dictionary to a dataset event.
Dataset extras can be viewed in the Airflow UI in the Dataset Events list on the producing task, consuming DAG run, as well as in the Datasets tab.

Extras can be programmatically retrieved from within Airflow tasks. Any Airflow task instance in a DAG run has access to the list of datasets that were involved in triggering that specific DAG run (triggering_dataset_events). Additionally, you can give any Airflow task access to all dataset events of a specific dataset by providing the dataset to the task’s inlets parameter. Defining inlets does not affect the schedule of the DAG.
To access the all dataset events that were involved in triggering a DAG run within a TaskFlow API task, simply pull it from the Airflow context. In a traditional operator, you can use Jinja templating in any templateable field of the operator to pull information from the Airflow context.
If you want to access dataset extras independently from which dataset events triggered a DAG run, you have the option to directly provide a dataset to a task as an inlet. In a TaskFlow API task you can fetch the inlet_events from the Airflow context, in a traditional operator you can use Jinja templating to access them.
Note that you can programmatically retrieve information from dataset aliases as well, see Fetching information from previously emitted dataset events through resolved dataset aliases for more information.
Any number of datasets can be provided to the schedule parameter. There are 3 types of dataset schedules:
schedule=[Dataset("a"), Dataset("b")]: Providing one or more Datasets as a list. The DAG is scheduled to run after all Datasets in the list have received at least one update.schedule=(Dataset("a") | Dataset("b")): (Airflow 2.9+) Using AND (&) and OR (|) operators to create a conditional dataset expression. Note that dataset expressions are enclosed in smooth brackets ().DatasetOrTimeSchedule: (Airflow 2.9+) Combining time based scheduling with dataset expressions, see combined dataset and time-based scheduling.When scheduling DAGs based on datasets, keep the following in mind:
task1 and task2 both produce dataset_a, a consumer DAG of dataset_a runs twice - first when task1 completes, and again when task2 completes.triggering_dataset_events from the context. This parameter provides a list of all the triggering dataset events with the parameters [timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ]. See Retrieving information in a downstream task for an example.In Airflow 2.9 and later, you can use logical operators to combine any number of datasets provided to the schedule parameter. The logical operators supported are | for OR and & for AND.
For example, to schedule a DAG on an update to either dataset1, dataset2, dataset3, or dataset4, you can use the following syntax. Note that the full statement is wrapped in ().
The downstream1_on_any DAG is triggered whenever any of the datasets dataset1, dataset2, dataset3, or dataset4 are updated. When clicking on x of 4 Datasets updated in the DAGs view, you can see the dataset expression that defines the schedule.

You can also combine the logical operators to create more complex expressions. For example, to schedule a DAG on an update to either dataset1 or dataset2 and either dataset3 or dataset4, you can use the following syntax:
The dataset expression this schedule creates is:
In Airflow 2.9 and later, you can combine dataset-based scheduling with time-based scheduling with the DatasetOrTimeSchedule timetable. A DAG scheduled with this timetable will run either when its timetable condition is met or when its dataset condition is met.
The DAG shown below runs on a time-based schedule defined by the 0 0 * * * cron expression, which is every day at midnight. The DAG also runs when either dataset3 or dataset4 is updated.
In the following example, the write_instructions_to_file and write_info_to_file are both producer tasks because they have defined outlets.
A consumer DAG runs whenever the dataset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS and INFO datasets are updated, you define the DAG’s schedule using the names of those two datasets.
Any DAG that is scheduled with a dataset is considered a consumer DAG even if that DAG doesn’t actually access the referenced dataset. In other words, it’s up to you as the DAG author to correctly reference and use datasets.