Skip to main content

Datasets and data-aware scheduling in Airflow

With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.

Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and a machine learning team with a DAG that trains a model on the dataset. Using datasets, the machine learning team's DAG runs only when the data engineering team's DAG has produced an update to the dataset.

In this guide, you'll learn about datasets in Airflow and how to use them to implement triggering of DAGs based on dataset updates.

info

Datasets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

Why use Airflow datasets?

Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:

  • Standardize communication between teams. Datasets can function like an API to communicate when data in a specific location has been updated and is ready for use.
  • Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don't depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates a dataset.
  • Get better visibility into how your DAGs are connected and how they depend on data. The Datasets tab in the Airflow UI shows a graph of all dependencies between DAGs and datasets in your Airflow environment.
  • Reduce costs, because datasets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
  • Create cross-deployment dependencies using the Airflow REST API. Astro customers can use the Cross-deployment dependencies best practices documentation for guidance.
  • (Airflow 2.9+) Create complex data-driven schedules using Conditional Dataset Scheduling and Combined Dataset and Time-based Scheduling.

Dataset concepts

You can define datasets in your DAG code and use them to create cross-DAG or even cross-Deployment dependencies. This section covers definitions for dataset terminology, as well as general information on how to use them.

Dataset terminology

You can define datasets in your DAG code and use them to create cross-DAG dependencies. Airflow uses the following terms related to the datasets feature:

  • Dataset: an object that is defined by a unique URI. Airflow parses the URI for validity and there are some constraints on how you can define it. If you want to avoid validity parsing, prefix your dataset name with x- for Airflow to treat it as a string. See What is a valid URI? for detailed information.
  • Dataset event: an event that is attached to a dataset and created whenever a producer task updates that particular dataset. A dataset event is defined by being attached to a specific dataset plus the timestamp of when a producer task updated the dataset. Optionally, a dataset event can contain an extra dictionary with additional information about the dataset or dataset event.
  • Dataset schedule: the schedule of a DAG that is triggered as soon as dataset events for one or more datasets are created. All datasets a DAG is scheduled on are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Datasets tab.
  • Producer task: a task that produces updates to one or more datasets provided to its outlets parameter, creating dataset events when it completes successfully.
  • Dataset expression: (Airflow 2.9+) a logical expression using AND (&) and OR (|) operators to define the schedule of a DAG scheduled on updates to several datasets.
  • Queued dataset event: It is common to have DAGs scheduled to run as soon as a set of datasets have received at least one update each. While there are still dataset events missing to trigger the DAG, all dataset events for other datasets the DAG is scheduled on are queued dataset events. A queued dataset event is defined by its dataset, timestamp and the DAG it is queuing for. One dataset event can create a queued dataset event for several DAGs. As of Airflow 2.9, you can access queued Dataset events for a specific DAG or a specific dataset programmatically, using the Airflow REST API.
  • DatasetAlias (Airflow 2.10+): an object that can be associated to one or more datasets and used to create schedules based on datasets created at runtime, see Use dataset aliases. A dataset alias is defined by a unique name.
  • Metadata (Airflow 2.10+): a class to attach extra information to a dataset from within the producer task. This functionality can be used to pass dataset-related metadata between tasks, see Attaching information to a dataset event.

Two parameters relating to Airflow datasets exist in all Airflow operators and decorators:

  • Outlets: a task parameter that contains the list of datasets a specific tasks produces updates to, as soon as it completes successfully. All outlets of a task are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Datasets tab as soon as the DAG code is parsed, i.e. independently of whether or not any dataset events have occurred. Note that Airflow is not yet aware of the underlying data. It is up to you to determine which tasks should be considered producer tasks for a dataset. As long as a task has an outlet dataset, Airflow considers it a producer task even if that task doesn't operate on the referenced dataset.
  • Inlets: a task parameter that contains the list of datasets a specific task has access to, typically to access extra information from related dataset events. Defining inlets for a task does not affect the schedule of the DAG containing the task and the relationship is not reflected in the Airflow UI.

To summarize, tasks produce updates to datasets given to their outlets parameter, and this action creates dataset events. DAGs can be scheduled based on dataset events created for one or more datasets, and tasks can be given access to all events attached to a dataset by defining the dataset as one of their inlets. A dataset is defined as an object in the Airflow metadata database as soon as it is referenced in either, the outlets parameter of a task or the schedule of a DAG.

Using datasets

When you work with datasets, keep the following considerations in mind:

  • Datasets events are only registered by DAGs or listeners in the same Airflow environment. If you want to create cross-Deployment dependencies with Datasets you will need to use the Airflow REST API to create a dataset event in the Airflow environment where your downstream DAG is located. See the Cross-deployment dependencies for an example implementation on Astro.
  • Airflow monitors datasets only within the context of DAGs and tasks. It does not monitor updates to datasets that occur outside of Airflow. I.e. Airflow will not notice if you manually add a file to an S3 bucket referenced by a dataset. To create Airflow dependencies based on outside events, use Airflow sensors.
  • The Datasets tab in the Airflow UI provides an overview over recent dataset events, existing datasets as well as a graph showing all dependencies between DAGs containing producing tasks, datasets and consuming DAGs. See Datasets tab for more information.
Listening for dataset changes

As of Airflow 2.8, you can use listeners to enable Airflow to run any code when certain dataset events occur anywhere in your Airflow instance. There are two listener hooks for the following events:

  • on_dataset_created
  • on_dataset_changed

For examples, refer to our Create Airflow listeners tutorial. Dataset Events listeners are an experimental feature.

Dataset definition

A dataset is defined as an object in the Airflow metadata database as soon as it is referenced in either the outlets parameter of a task or the schedule of a DAG. Airflow 2.10 added the ability to create dataset aliases, see Use Dataset Aliases.

Basic Dataset definition

The simplest dataset schedule is one DAG scheduled based on updates to one dataset which is produced to by one task. In this example we define that the my_producer_task task in the my_producer_dag DAG produces updates to the s3://my-bucket/my-key/ dataset, creating attached dataset events, and schedule the my_consumer_dag DAG to run once for every dataset event created.

First, provide the dataset to the outlets parameter of the producer task.

from airflow.decorators import dag, task
from airflow.datasets import Dataset


@dag(
start_date=None,
schedule=None,
catchup=False,
)
def my_producer_dag():

@task(outlets=[Dataset("s3://my-bucket/my-key/")])
def my_producer_task():
pass

my_producer_task()


my_producer_dag()

You can see the relationship between the DAG containing the producing task (my_producer_dag) and the dataset in the Dependency Graph located in the Datasets tab of the Airflow UI. Note that this screenshot is using Airflow 2.10 and the UI might look different in previous versions.

Screenshot of the Dependency Graph of the Datasets tab showing my_producer_dag connected to the s3://my-bucket/my-key/ dataset.

In Airflow 2.9+ the graph view of the my_producer_dag shows the dataset as well.

Screenshot of a DAG Graph showing my_producer_task connected to the s3://my-bucket/my-key/ dataset.

Next, schedule the my_consumer_dag to run as soon as a new dataset event is produced to the s3://my-bucket/my-key/ dataset.

from airflow.decorators import dag
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator
from pendulum import datetime


@dag(
start_date=datetime(2024, 8, 1),
schedule=[Dataset("s3://my-bucket/my-key/")],
catchup=False,
)
def my_consumer_dag():

EmptyOperator(task_id="empty_task")


my_consumer_dag()

You can see the relationship between the DAG containing the producing task (my_producer_dag), the consuming DAG my_consumer_dag and the dataset in the Dependency Graph located in the Datasets tab of the Airflow UI. Note that this screenshot is using Airflow 2.10 and the UI might look different in previous versions.

Screenshot of the Dependency Graph of the Datasets tab showing my_producer_dag connected to the s3://my-bucket/my-key/ dataset which is connected to my_consumer_dag

In Airflow 2.9+ the graph view of the my_consumer_dag shows the dataset as well.

Screenshot of a DAG Graph showing my_producer_task connected to the s3://my-bucket/my-key/ dataset.

After unpausing the my_consumer_dag, every successful completion of the my_producer_task task triggers a run of the my_consumer_dag.

Screenshot DAGs page with one run each of the my_producer_dag and my_consumer_dag as well as the dataset schedule displayed

In Airflow 2.10+ the producing task will list the Dataset Events it caused in its details page, including links to the Triggered Dag Runs.

Screenshot of the Details tab of the my_producer_task showing one Dataset event of the s3://my-bucket/my-key/ with one Triggered Dag Run

The triggered DAG run of the my_consumer_dag also lists the dataset event, including a link to the source dag from within which the dataset event was created.

Screenshot of the Details tab of the DAG run of the my_consumer_dag showing one Dataset event of the s3://my-bucket/my-key/

Use dataset aliases

In Airflow 2.10+ you have the option to create dataset aliases to schedule DAGs based on datasets with URIs generated at runtime. A dataset alias is defined by a unique name string and can be used in place of a regular dataset in outlets and schedules. Any number of dataset events updating different datasets can be attached to a dataset alias.

There are two ways to add a dataset event to a dataset alias:

  • Using the Metadata class.
  • Using outlet_events pulled from the Airflow context.

See the code below for examples, note how the URI of the dataset is determined at runtime inside the producing task.

# from airflow.decorators import task
# from airflow.datasets import Dataset, DatasetAlias
# from airflow.datasets.metadata import Metadata

my_alias_name = "my_alias"


@task(outlets=[DatasetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
bucket_name = "my-bucket" # determined at runtime, for example based on upstream input
yield Metadata(
Dataset(f"s3://{bucket_name}/my-task"),
extra={"k": "v"}, # extra has to be provided, can be {}
alias=my_alias_name,
)

attach_event_to_alias_metadata()

In the consuming DAG you can use a dataset alias in place of a regular dataset.

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.datasets import Dataset
from pendulum import datetime

my_alias_name = "my_alias"

@dag(
start_date=datetime(2024, 8, 1),
schedule=[DatasetAlias(my_alias_name)],
catchup=False,
)
def my_consumer_dag():

EmptyOperator(task_id="empty_task")


my_consumer_dag()

Since the dataset event is generated at runtime with a dynamic URI, Airflow does not know in advance which dataset will trigger the run of the my_consumer_dag. The Airflow UI displays Unresolved DatasetAlias as the DAG schedule for DAGs that are only scheduled on aliases that have never had a dataset event attached to them.

Screenshot the DAGs view showing an Unresolved DatasetAlias schedule on my_consumer_dag.

Once the my_producer_dag containing the attach_event_to_alias_metadata task completes successfully, reparsing of all DAGs scheduled on the dataset alias my_alias is automatically triggered. This reparsing step attaches the s3://my-bucket/my-task dataset to the my_alias dataset alias and the schedule resolves, triggering one run of the my_consumer_dag.

Screenshot the DAGs view showing an the resolved dataset schedule and one successful run each for the my_producer_dag and my_consumer_dag.

Any further dataset event for the s3://my-bucket/my-task dataset will now trigger the my_consumer_dag. If you attach dataset events for several datasets to the same dataset alias, a DAG scheduled on that dataset alias will run as soon as any of the datasets that were ever attached to the dataset alias receive an update.

See Dynamic data events emitting and dataset creation through DatasetAlias for more information and examples of using dataset aliases.

To use Dataset Aliases with traditional operators, you need to attach the dataset event to the alias inside the operator logic. If you are using operators besides the PythonOperator, you can either do so in a custom operator's .execute method or by passing a post_execute callable to existing operators (experimental). Use outlet_events when attaching dataset events to aliases in traditional or custom operators. Note that for deferrable operators, attaching a dataset event to an alias is only supported in the execute_complete or post_execute method.

    def _attach_event_to_alias(context, result):  # result = the return value of the execute method
# use any logic to determine the URI
uri = "s3://my-bucket/my_file.txt"
context["outlet_events"][my_alias_name].add(Dataset(uri))

BashOperator(
task_id="t2",
bash_command="echo hi",
outlets=[DatasetAlias(my_alias_name)],
post_execute=_attach_event_to_alias, # using the post_execute parameter is experimental
)
Click to view an example of a custom operator attaching a dataset event to a dataset alias.
"""
### Dataset Alias in a custom operator
"""

from airflow.decorators import dag
from airflow.datasets import Dataset, DatasetAlias
from pendulum import datetime
import logging

t_log = logging.getLogger("airflow.task")

my_alias_name = "my-alias"

# import the operator to inherit from
from airflow.models.baseoperator import BaseOperator


# custom operator producing to a dataset alias
class MyOperator(BaseOperator):
"""
Simple example operator that attaches a dataset event to a dataset alias.
:param my_bucket_name: (str) The name of the bucket to use in the dataset URI.
"""

# define the .__init__() method that runs when the DAG is parsed
def __init__(self, my_bucket_name, my_alias_name, *args, **kwargs):
# initialize the parent operator
super().__init__(*args, **kwargs)
# assign class variables
self.my_bucket_name = my_bucket_name
self.my_alias_name = my_alias_name

def execute(self, context):

# add your custom operator logic here

# use any logic to derive the dataset URI
my_uri = f"s3://{self.my_bucket_name}/my_file.txt"
context["outlet_events"][self.my_alias_name].add(Dataset(my_uri))

return "hi :)"

# define the .post_execute() method that runs after the execute method (optional)
# result is the return value of the execute method
def post_execute(self, context, result=None):
# write to Airflow task logs
self.log.info("Post-execution step")

# It is also possible to add events to the alias in the post_execute method


@dag(
start_date=datetime(2024, 8, 1),
schedule=None,
catchup=False,
doc_md=__doc__,
)
def dataset_alias_custom_operator():

MyOperator(
task_id="t1",
my_bucket_name="my-bucket",
my_alias_name=my_alias_name,
outlets=[DatasetAlias(my_alias_name)],
)


dataset_alias_custom_operator()

Updating a dataset

As of Airflow 2.9+ there are three ways to update a dataset:

Attaching information to a dataset event

When updating a dataset in the Airflow UI or making a POST request to the Airflow REST API, you can attach extra information to the dataset event by providing an extra json payload. Airflow 2.10 added the possibility to add extra information from within the producing task using either the Metadata class or accessing outlet_events from the Airflow context. You can attach any information to the extra that was computed within the task, for example information about the dataset you are working with.

To use the Metadata class to attach information to a dataset, follow the example in the code snippet below. Make sure that the dataset used in the metadata class is also defined as an outlet in the producer task.

# from airflow.decorators import task
# from airflow.datasets import Dataset
# from airflow.datasets.metadata import Metadata

my_dataset_1 = Dataset("x-dataset1")

@task(outlets=[my_dataset_1])
def attach_extra_using_metadata():
num = 23
yield Metadata(my_dataset_1, {"myNum": num})

return "hello :)"

attach_extra_using_metadata()

You can also access the outlet_events from the Airflow context directly to add an extra dictionary to a dataset event.

# from airflow.decorators import task
# from airflow.datasets import Dataset
# from airflow.datasets.metadata import Metadata

my_dataset_2 = Dataset("x-dataset2")

@task(outlets=[my_dataset_2])
def use_outlet_events(**context):
num = 19
context["outlet_events"][my_dataset_2].extra = {"my_num": num}

return "hello :)"

use_outlet_events()

Dataset extras can be viewed in the Airflow UI in the Dataset Events list on the producing task, consuming DAG run, as well as in the Datasets tab.

Screenshot of the Dataset Events list under the Datasets tab in the Airflow UI showing two datasets with one extra each

Retrieving dataset information in a downstream task

Extras can be programmatically retrieved from within Airflow tasks. Any Airflow task instance in a DAG run has access to the list of datasets that were involved in triggering that specific DAG run (triggering_dataset_events). Additionally, you can give any Airflow task access to all dataset events of a specific dataset by providing the dataset to the task's inlets parameter. Defining inlets does not affect the schedule of the DAG.

To access the all dataset events that were involved in triggering a DAG run within a TaskFlow API task, simply pull it from the Airflow context. In a traditional operator, you can use Jinja templating in any templateable field of the operator to pull information from the Airflow context.

# from airflow.decorators import task

@task
def get_extra_triggering_run(**context):
# all events that triggered this specific DAG run
triggering_dataset_events = context["triggering_dataset_events"]
# the loop below wont run if the DAG is manually triggered
for dataset, dataset_list in triggering_dataset_events.items():
print(dataset, dataset_list)
print(dataset_list[0].extra)
# you can also fetch the run_id and other information about the upstream DAGs,
# note that this will error if the Dataset was updated via the API!
print(dataset_list[0].source_dag_run.run_id)

If you want to access dataset extras independently from which dataset events triggered a DAG run, you have the option to directly provide a dataset to a task as an inlet. In a TaskFlow API task you can fetch the inlet_events from the Airflow context, in a traditional operator you can use Jinja templating to access them.

# from airflow.decorators import task
# from airflow.datasets import Dataset

my_dataset_2 = Dataset("x-dataset2")

# note that my_dataset_2 does not need to be part of the DAGs schedule
# you can provide as many inlets as you wish
@task(inlets=[my_dataset_2])
def get_extra_inlet(**context):
# inlet_events are listed earliest to latest by timestamp
dataset_events = context["inlet_events"][my_dataset_2]
# protect against the dataset not existing
if len(dataset_events) == 0:
print(f"No dataset_events for {my_dataset_2.uri}")
else:
# accessing the latest dataset event for this dataset
# if the extra does not exist, return None
my_num = dataset_events[-1].extra.get("myNum", None)
print(my_num)

get_extra_inlet()

Note that you can programmatically retrieve information from dataset aliases as well, see Fetching information from previously emitted dataset events through resolved dataset aliases for more information.

Dataset schedules

Any number of datasets can be provided to the schedule parameter. There are 3 types of dataset schedules:

  • schedule=[Dataset("a"), Dataset("b")]: Providing one or more Datasets as a list. The DAG is scheduled to run after all Datasets in the list have received at least one update.
  • schedule=(Dataset("a") | Dataset("b")): (Airflow 2.9+) Using AND (&) and OR (|) operators to create a conditional dataset expression. Note that dataset expressions are enclosed in smooth brackets ().
  • DatasetOrTimeSchedule: (Airflow 2.9+) Combining time based scheduling with dataset expressions, see combined dataset and time-based scheduling.

When scheduling DAGs based on datasets, keep the following in mind:

  • Consumer DAGs that are scheduled on a dataset are triggered every time a task that updates that dataset completes successfully. For example, if task1 and task2 both produce dataset_a, a consumer DAG of dataset_a runs twice - first when task1 completes, and again when task2 completes.
  • Consumer DAGs scheduled on a dataset are triggered as soon as the first task with that dataset as an outlet finishes, even if there are downstream producer tasks that also operate on the dataset.
  • Consumer DAGs scheduled on multiple datasets run as soon as their expression is fulfilled by at least one dataset event per dataset in the expression. This means that it does not matter to the consuming DAG whether a dataset received additional updates in the meantime, it consumes all queued events for one dataset as one input. See Multiple Datasets for more information.
  • As of Airflow 2.10 a consumer DAG that is paused will ignore all updates to datasets that occurred while it was paused. Meaning, it starts with a blank slate upon being unpaused. In previous Airflow versions, a consumer DAG scheduled on one dataset that had received an update while the DAG was paused would run immediately when being unpaused.
  • DAGs that are triggered by datasets do not have the concept of a data interval. If you need information about the triggering event in your downstream DAG, you can use the parameter triggering_dataset_events from the context. This parameter provides a list of all the triggering dataset events with the parameters [timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ]. See Retrieving information in a downstream task for an example.

Conditional dataset scheduling

In Airflow 2.9 and later, you can use logical operators to combine any number of datasets provided to the schedule parameter. The logical operators supported are | for OR and & for AND.

For example, to schedule a DAG on an update to either dataset1, dataset2, dataset3, or dataset4, you can use the following syntax. Note that the full statement is wrapped in ().

from airflow.decorators import dag
from airflow.models.datasets import Dataset
from pendulum import datetime

@dag(
start_date=datetime(2024, 3, 1),
schedule=(
Dataset("dataset1")
| Dataset("dataset2")
| Dataset("dataset3")
| Dataset("dataset4")
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
)
def downstream1_on_any():

# your tasks here

downstream1_on_any()

The downstream1_on_any DAG is triggered whenever any of the datasets dataset1, dataset2, dataset3, or dataset4 are updated. When clicking on x of 4 Datasets updated in the DAGs view, you can see the dataset expression that defines the schedule.

Screenshot of the Airflow UI with a pop up showing the dataset expression for the downstream1_on_any DAG listing the 4 datasets under "any"

You can also combine the logical operators to create more complex expressions. For example, to schedule a DAG on an update to either dataset1 or dataset2 and either dataset3 or dataset4, you can use the following syntax:

from airflow.decorators import dag
from airflow.models.datasets import Dataset
from pendulum import datetime

@dag(
start_date=datetime(2024, 3, 1),
schedule=(
(Dataset("dataset1") | Dataset("dataset2"))
& (Dataset("dataset3") | Dataset("dataset4"))
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False
)
def downstream2_one_in_each_group():

# your tasks here

downstream2_one_in_each_group()

The dataset expression this schedule creates is:

{
"all": [
{
"any": [
"dataset1",
"dataset2"
]
},
{
"any": [
"dataset3",
"dataset4"
]
}
]
}

Combined dataset and time-based scheduling

In Airflow 2.9 and later, you can combine dataset-based scheduling with time-based scheduling with the DatasetOrTimeSchedule timetable. A DAG scheduled with this timetable will run either when its timetable condition is met or when its dataset condition is met.

The DAG shown below runs on a time-based schedule defined by the 0 0 * * * cron expression, which is every day at midnight. The DAG also runs when either dataset3 or dataset4 is updated.

from airflow.decorators import dag, task
from airflow.datasets import Dataset
from pendulum import datetime
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
start_date=datetime(2024, 3, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
datasets=(Dataset("dataset3") | Dataset("dataset4")),
# Use () instead of [] to be able to use conditional dataset scheduling!
),
catchup=False,
)
def toy_downstream3_dataset_and_time_schedule():

# your tasks here

toy_downstream3_dataset_and_time_schedule()

Example implementation

In the following example, the write_instructions_to_file and write_info_to_file are both producer tasks because they have defined outlets.

from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task

API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")


@dag(
start_date=datetime(2022, 10, 1),
schedule=None,
catchup=False,
)
def datasets_producer_dag():
@task
def get_cocktail(api):
import requests

r = requests.get(api)
return r.json()

@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"

f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()

@task(outlets=[INFO])
def write_info_to_file(response):
import time

time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()

cocktail = get_cocktail(api=API)

write_instructions_to_file(cocktail)
write_info_to_file(cocktail)


datasets_producer_dag()

A consumer DAG runs whenever the dataset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS and INFO datasets are updated, you define the DAG's schedule using the names of those two datasets.

Any DAG that is scheduled with a dataset is considered a consumer DAG even if that DAG doesn't actually access the referenced dataset. In other words, it's up to you as the DAG author to correctly reference and use datasets.

from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task

INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")


@dag(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
catchup=False,
)
def datasets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)

return [item for sublist in cocktail for item in sublist]

read_about_cocktail()


datasets_consumer_dag()

Was this page helpful?