Skip to main content
Version: Airflow 3.x

Assets and data-aware scheduling in Airflow

With Assets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these assets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.

Assets can help resolve common issues. For example, consider a data engineering team with a DAG that creates an asset and a machine learning team with a DAG that trains a model on the asset. Using assets, the machine learning team's DAG runs only when the data engineering team's DAG has produced an update to the asset.

In this guide, you'll learn:

  • When to use assets in Airflow.
  • How to use the @asset syntax to create data-oriented pipelines.
  • How to define Airflow tasks as producers of assets.
  • How to run DAGs based on basic and advanced asset schedules.
  • How to use asset aliases to create dynamic asset schedules.
  • How to attach information to, and retrieve information from, asset events.
info

Assets can be used to schedule a DAG based on messages in a message queue. This sub-type of data-aware scheduling is called event-driven scheduling. See Schedule DAGs based on Events in a Message Queue for more information.

info

Assets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

@asset syntax

The @asset decorator is a shorthand to create one DAG with one task that produces an asset. This decorator is used in the asset-oriented approach to writing DAGs which constitutes a mindset shift to put the data asset front and center. Whether to use the asset-oriented or task-oriented approach to writing DAGs is a matter of preferences. DAGs created using the asset-oriented approach are listed like any other DAG in the Airflow UI.

The following code snippet defines a DAG with the DAG ID my_asset that runs on a @daily schedule. It contains one task with the task ID my_asset that, upon successful completion updates an asset with the name my_asset.

from airflow.sdk import asset 

@asset(schedule="@daily")
def my_asset():
# your task logic here
pass

You can schedule assets based on other assets to create data-centric pipelines. Since each @asset decorator creates one DAG, data needs to be passed via cross-dag XComs. Below you can see the same simple ETL pipeline accomplished using the asset-oriented and the task-oriented approach.

from airflow.sdk import asset


@asset(schedule="@daily")
def extracted_data():
return {"a": 1, "b": 2}


@asset(schedule=extracted_data)
def transformed_data(context):

data = context["ti"].xcom_pull(
dag_id="extracted_data",
task_ids="extracted_data",
key="return_value",
include_prior_dates=True,
)
return {k: v * 2 for k, v in data.items()}


@asset(schedule=transformed_data)
def loaded_data(context):

data = context["task_instance"].xcom_pull(
dag_id="transformed_data",
task_ids="transformed_data",
key="return_value",
include_prior_dates=True,
)
summed_data = sum(data.values())
print(f"Summed data: {summed_data}")

The code about creates 3 DAGs that depend on each other, each containing one task that updates one dataset:

DAGs view showing 3 DAGs.

To update several assets from the same DAG written with the asset-oriented approach, you can use @asset.multi. The code example below will create one DAG with the DAG ID my_multi_asset that contains one task called my_multi_asset which upon successful completion updates two assets with the names asset_a and asset_b.

from airflow.sdk import Asset, asset

@asset.multi(schedule="@daily", outlets=[Asset("asset_a"), Asset("asset_b")])
def my_multi_asset():
pass

Asset concepts

Aside from using the @asset decorator, you can define assets in your task-oriented DAG code as well, and use them to create cross-DAG or even cross-Deployment dependencies representing the flow of concrete or abstract data objects through your DAGs. This section covers definitions for asset terminology, as well as general information on how to use them.

Asset terminology

You can define assets in your DAG code and use them to create cross-DAG dependencies. Airflow uses the following terms related to the assets feature:

  • Asset: an object in Airflow that represents a concrete or abstract data entity and is defined by a unique name. Optionally, a URI can be attached to the asset, when it represents a concrete data entity, like a file in object storage or a table in a relational database.
  • @asset: a decorator that can be used to write a DAG with the asset-oriented approach, directly declaring the desired asset in Python with less boilerplate code. Using @asset creates a DAG with a single task that updates an asset. See @asset syntax for more information.
  • Asset event: an event that is attached to an asset and created whenever a producer task updates that particular asset. An asset event is defined by being attached to a specific asset plus the timestamp of when a producer task updated the asset. Optionally, an asset event can contain an extra dictionary with additional information about the asset or asset event.
  • Asset schedule: the schedule of a DAG that is triggered as soon as asset events for one or more assets are created. All assets a DAG is scheduled on are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Assets tab.
  • Producer task: a task that produces updates to one or more assets provided to its outlets parameter, creating asset events when it completes successfully.
  • Asset expression: a logical expression using AND (&) and OR (|) operators to define the schedule of a DAG scheduled on updates to several assets.
  • Queued asset event: It is common to have DAGs scheduled to run as soon as a set of assets have received at least one update each. While there are still asset events missing to trigger the DAG, all asset events for other assets the DAG is scheduled on are queued asset events. A queued asset event is defined by its asset, timestamp and the DAG it is queuing for. One asset event can create a queued asset event for several DAGs. You can access queued Asset events for a specific DAG or a specific asset programmatically, using the Airflow REST API.
  • AssetAlias: an object that can be associated to one or more assets and used to create schedules based on assets created at runtime, see Use asset aliases. An asset alias is defined by a unique name.
  • Metadata: a class to attach extra information to an asset from within the producer task. This functionality can be used to pass asset-related metadata between tasks, see Attaching information to an asset event.
  • AssetWatcher: a class that is used in event-driven scheduling to watch for a TriggerEvent caused by a message in a message queue.

Two parameters relating to Airflow assets exist in all Airflow operators and decorators:

  • Outlets: a task parameter that contains the list of assets a specific tasks produces updates to, as soon as it completes successfully. All outlets of a task are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Assets tab as soon as the DAG code is parsed, i.e. independently of whether or not any asset events have occurred. Note that Airflow is not yet aware of the underlying data. It is up to you to determine which tasks should be considered producer tasks for an asset. As long as a task has an outlet asset, Airflow considers it a producer task even if that task doesn't operate on the referenced asset.
  • Inlets: a task parameter that contains the list of assets a specific task has access to, typically to access extra information from related asset events. Defining inlets for a task does not affect the schedule of the DAG containing the task and the relationship is not reflected in the Airflow UI.

To summarize, tasks produce updates to assets given to their outlets parameter, and this action creates asset events. DAGs can be scheduled based on asset events created for one or more assets, and tasks can be given access to all events attached to an asset by defining the asset as one of their inlets. A asset is defined as an object in the Airflow metadata database as soon as it is referenced in either, the outlets parameter of a task or the schedule of a DAG.

Why use Airflow assets?

Assets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:

  • Standardize communication between teams. Assets can function like an API to communicate when data in a specific location has been updated and is ready for use.
  • Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don't depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates an asset.
  • Get better visibility into how your DAGs are connected and how they depend on data. The Assets graphs in the Airflow UI display how assets and DAGs depend on each other and can be used to navigate between them.
  • Reduce costs, because assets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
  • Create cross-deployment dependencies using the Airflow REST API. Astro customers can use the Cross-deployment dependencies best practices documentation for guidance.
  • Create complex data-driven schedules using Conditional Asset Scheduling and Combined Asset and Time-based Scheduling.
  • Schedule a DAG based on messages in a message queue with event-driven scheduling.

Using assets

When you work with assets, keep the following considerations in mind:

  • Assets events are only registered by DAGs or listeners in the same Airflow environment. If you want to create cross-Deployment dependencies with Assets you will need to use the Airflow REST API to create an asset event in the Airflow environment where your downstream DAG is located. See the Cross-deployment dependencies for an example implementation on Astro.
  • Airflow monitors assets only within the context of DAGs and tasks. It does not monitor updates to assets that occur outside of Airflow. I.e. Airflow will not notice if you manually add a file to an S3 bucket referenced by an asset. To create Airflow dependencies based on outside events, use Airflow sensors, deferrable operators or consider using a message queue as an intermediary and implement event-driven scheduling.
  • The Assets tab in the Airflow UI provides a list of active assets in your Airflow environment with an asset graph for each asset showing its dependencies to DAGs and other assets.
Listening for asset changes

You can use listeners to enable Airflow to run any code when certain asset events occur anywhere in your Airflow instance. There are two listener hooks for the following events:

  • on_asset_created
  • on_asset_alias_created
  • on_asset_updated

For examples, refer to our Create Airflow listeners tutorial. Note that listeners are an advanced feature of Airflow. They are not isolated from the Airflow components they run in, and can slow down or in come cases take down your Airflow instance. As such, extra care should be taken when writing listeners. Asset Event listeners are an experimental feature.

Asset definition

A asset is defined as an object in the Airflow metadata database as soon as it is referenced in either the outlets parameter of a task or the schedule of a DAG. For information on how to create an asset alias, see Use Asset Aliases.

Basic Asset definition

The simplest asset schedule is one DAG scheduled based on updates to one asset which is produced to by one task. In this example we define that the my_producer_task task in the my_producer_dag DAG produces updates to the my_asset asset, creating attached asset events, and schedule the my_consumer_dag DAG to run once for every asset event created.

First, provide the asset to the outlets parameter of the producer task.

from airflow.sdk import Asset, dag, task

@dag
def my_producer_dag():

@task(outlets=[Asset("my_asset")])
def my_producer_task():
pass

my_producer_task()


my_producer_dag()

You can see the relationship between the DAG containing the producing task (my_producer_dag) and the asset in the Asset Graph located in the Assets tab of the Airflow UI.

Screenshot of the Dependency Graph of the Assets tab showing my_producer_dag connected to the my_asset asset.

The graph view of the my_producer_dag shows the asset as well, if external conditions or all DAG dependencies are selected in the graph options Options.

Screenshot of a DAG Graph showing my_producer_task connected to the my_asset asset.

Next, schedule the my_consumer_dag to run as soon as a new asset event is produced to the my_asset asset.

from airflow.sdk import Asset, dag
from airflow.providers.standard.operators.empty import EmptyOperator


@dag(
schedule=[Asset("my_asset")],
)
def my_consumer_dag():

EmptyOperator(task_id="empty_task")


my_consumer_dag()

You can see the relationship between the DAG containing the producing task (my_producer_dag), the consuming DAG my_consumer_dag and the asset in the asset graph located in the Assets tab of the Airflow UI.

Screenshot of the Dependency Graph of the Assets tab showing my_producer_dag connected to the my_asset asset which is connected to my_consumer_dag

When external conditions or all DAG dependencies are selected, the my_consumer_dag graph shows the asset as well.

Screenshot of a DAG Graph showing my_producer_task connected to the my_asset asset.

After unpausing the my_consumer_dag, every successful completion of the my_producer_task task triggers a run of the my_consumer_dag.

Screenshot DAGs page with one run each of the my_producer_dag and my_consumer_dag as well as the asset schedule displayed

The producing task lists the Asset Events it caused in its details page, including a link to the Triggered Dag Run.

Screenshot of the Details tab of the my_producer_task showing one Asset event of the my_asset with one Triggered Dag Run

The triggered DAG run of the my_consumer_dag also lists the asset event, including a link to the source DAG from within which the asset event was created.

Screenshot of the Details tab of the DAG run of the my_consumer_dag showing one Asset event of the my_asset

Use asset aliases

You have the option to create asset aliases to schedule DAGs based on assets with names generated at runtime. An asset alias is defined by a unique name string and can be used in place of a regular asset in outlets and schedules. Any number of asset events updating different assets can be attached to an asset alias.

There are two ways to add an asset event to an asset alias:

  • Using the Metadata class.
  • Using outlet_events pulled from the Airflow context.

See the code below for examples, note how the name of the asset is determined at runtime inside the producing task.

# from airflow.sdk import Asset, AssetAlias, Metadata, task

my_alias_name = "my_alias"


@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
bucket_name = "my-bucket" # determined at runtime, for example based on upstream input
yield Metadata(
asset=Asset(f"updated_{bucket_name}"),
extra={"k": "v"}, # extra has to be provided, can be {}
alias=AssetAlias(my_alias_name),
)

attach_event_to_alias_metadata()

In the consuming DAG you can use an asset alias in place of a regular asset.

from airflow.sdk import AssetAlias, dag
from airflow.providers.standard.operators.empty import EmptyOperator

my_alias_name = "my_alias"


@dag(schedule=[AssetAlias(my_alias_name)])
def my_consumer_dag():

EmptyOperator(task_id="empty_task")


my_consumer_dag()

Once the my_producer_dag containing the attach_event_to_alias_metadata task completes successfully, reparsing of all DAGs scheduled on the asset alias my_alias is automatically triggered. This reparsing step attaches the updated_{bucket_name} asset to the my_alias asset alias and the schedule resolves, triggering one run of the my_consumer_dag.

Any further asset event for the updated_{bucket_name} asset will now trigger the my_consumer_dag. If you attach asset events for several assets to the same asset alias, a DAG scheduled on that asset alias will run as soon as any of the assets that were ever attached to the asset alias receive an update.

See Dynamic data events emitting and asset creation through AssetAlias for more information and examples of using asset aliases.

To use Asset Aliases with traditional operators, you need to attach the asset event to the alias inside the operator logic. If you are using operators besides the PythonOperator, you can either do so in a custom operator's .execute method or by passing a post_execute callable to existing operators (experimental). Use outlet_events when attaching asset events to aliases in traditional or custom operators. Note that for deferrable operators, attaching an asset event to an alias is only supported in the execute_complete or post_execute method.

    def _attach_event_to_alias(context, result):  # result = the return value of the execute method
# use any logic to determine the URI
uri = "s3://my-bucket/my_file.txt"
context["outlet_events"][AssetAlias(my_alias_name)].add(Asset(uri))

BashOperator(
task_id="t2",
bash_command="echo hi",
outlets=[AssetAlias(my_alias_name)],
post_execute=_attach_event_to_alias, # using the post_execute parameter is experimental
)
Click to view an example of a custom operator attaching an asset event to an asset alias.
"""
### AssetAlias in a custom operator
"""

from airflow.sdk import Asset, AssetAlias, dag
from airflow.sdk.bases.operator import BaseOperator
import logging

t_log = logging.getLogger("airflow.task")

my_alias_name = "my-alias"


# custom operator producing to an asset alias
class MyOperator(BaseOperator):
"""
Simple example operator that attaches an asset event to an asset alias.
:param my_bucket_name: (str) The name of the bucket to use in the asset name.
"""

# define the .__init__() method that runs when the DAG is parsed
def __init__(self, my_bucket_name, my_alias_name, *args, **kwargs):
# initialize the parent operator
super().__init__(*args, **kwargs)
# assign class variables
self.my_bucket_name = my_bucket_name
self.my_alias_name = my_alias_name

def execute(self, context):

# add your custom operator logic here

# use any logic to derive the dataset URI
my_asset_name = f"updated_{self.my_bucket_name}"
context["outlet_events"][AssetAlias(self.my_alias_name)].add(
Asset(my_asset_name)
)

return "hi :)"

# define the .post_execute() method that runs after the execute method (optional)
# result is the return value of the execute method
def post_execute(self, context, result=None):
# write to Airflow task logs
self.log.info("Post-execution step")

# It is also possible to add events to the alias in the post_execute method


@dag
def asset_alias_custom_operator():

MyOperator(
task_id="t1",
my_bucket_name="my-bucket",
my_alias_name=my_alias_name,
outlets=[AssetAlias(my_alias_name)],
)


asset_alias_custom_operator()

Updating an asset

There are five ways to update an asset:

  • A DAG defined using @asset completes successfully. Under the hood, @asset creates a DAG with one task which produces the asset.

  • A task with an outlet parameter that references the asset completes successfully.

  • A POST request to the assets endpoint of the Airflow REST API.

  • An AssetWatcher that listens for a TriggerEvent caused by a message in a message queue. See event-driven scheduling for more information.

  • A manual update in the Airflow UI by using the Create Asset Event button on the asset graph. There are two options when creating an asset event in the UI:

    • Materialize: This option runs the full DAG which contains the task that produces the asset event.
    • Manual: This option directly creates a new asset event without running any task that would normally produce the asset event. This option is useful for testing or when you want to create an asset event for an asset that is not updated from within a DAG in this Airflow instance.

    Screenshot of the Airflow UI showing manual updates to an asset.

Attaching information to an asset event

When updating an asset in the Airflow UI or making a POST request to the Airflow REST API, you can attach extra information to the asset event by providing an extra json payload. You can add extra information from within the producing task using either the Metadata class or accessing outlet_events from the Airflow context. You can attach any information to the extra that was computed within the task, for example information about the asset you are working with.

To use the Metadata class to attach information to an asset, follow the example in the code snippet below. Make sure that the asset used in the metadata class is also defined as an outlet in the producer task.

# from airflow.sdk import Asset, Metadata, task

my_asset_1 = Asset("x-asset1")

@task(outlets=[my_asset_1])
def attach_extra_using_metadata():
num = 23
yield Metadata(my_asset_1, {"myNum": num})

return "hello :)"

attach_extra_using_metadata()

You can also access the outlet_events from the Airflow context directly to add an extra dictionary to an asset event.

# from airflow.sdk import Asset, Metadata, task

my_asset_2 = Asset("x-asset2")

@task(outlets=[my_asset_2])
def use_outlet_events(**context):
num = 19
context["outlet_events"][my_asset_2].extra = {"my_num": num}

return "hello :)"

use_outlet_events()

Asset extras can be viewed in the Airflow UI in the asset graph of an asset.

Screenshot of the Asset extra information.

Retrieving asset information in a downstream task

Extras can be programmatically retrieved from within Airflow tasks. Any Airflow task instance in a DAG run has access to the list of assets that were involved in triggering that specific DAG run (triggering_asset_events). Additionally, you can give any Airflow task access to all asset events of a specific asset by providing the asset to the task's inlets parameter. Defining inlets does not affect the schedule of the DAG.

To access the all asset events that were involved in triggering a DAG run within a TaskFlow API task, simply pull it from the Airflow context. In a traditional operator, you can use Jinja templating in any templateable field of the operator to pull information from the Airflow context.

# from airflow.sdk import task

@task
def get_extra_triggering_run(**context):
# all events that triggered this specific DAG run
triggering_asset_events = context["triggering_asset_events"]
# the loop below wont run if the DAG is manually triggered
for asset, asset_list in triggering_asset_events.items():
print(asset, asset_list)
print(asset_list[0].extra)
# you can also fetch the run_id and other information about the upstream DAGs
print(asset_list[0].source_run_id)

If you want to access asset extras independently from which asset events triggered a DAG run, you have the option to directly provide an asset to a task as an inlet. In a TaskFlow API task you can fetch the inlet_events from the Airflow context.

# from airflow.sdk import Asset, task

my_asset_2 = Asset("x-asset2")

# note that my_asset_2 does not need to be part of the DAGs schedule
# you can provide as many inlets as you wish
@task(inlets=[my_asset_2])
def get_extra_inlet(**context):
# inlet_events are listed earliest to latest by timestamp
asset_events = context["inlet_events"][my_asset_2]
# protect against the asset not existing
if len(asset_events) == 0:
print(f"No asset_events for {my_asset_2.uri}")
else:
# accessing the latest asset event for this asset
# if the extra does not exist, return None
my_extra = asset_events[-1].extra
print(my_extra)

get_extra_inlet()

Note that you can programmatically retrieve information from asset aliases as well, see Fetching information from previously emitted asset events through resolved asset aliases for more information.

Asset schedules

Any number of assets can be provided to the schedule parameter. There are 3 types of asset schedules:

  • schedule=[Asset("a"), Asset("b")]: Providing one or more Assets as a list. The DAG is scheduled to run after all Assets in the list have received at least one update.
  • schedule=(Asset("a") | Asset("b")): Using AND (&) and OR (|) operators to create a conditional asset expression. Note that asset expressions are enclosed in smooth brackets ().
  • AssetOrTimeSchedule: Combining time based scheduling with asset expressions, see combined asset and time-based scheduling.

When scheduling DAGs based on assets, keep the following in mind:

  • Consumer DAGs that are scheduled on an asset are triggered every time a task that updates that asset completes successfully. For example, if task1 and task2 both produce asset_a, a consumer DAG of asset_a runs twice - first when task1 completes, and again when task2 completes.
  • Consumer DAGs scheduled on an asset are triggered as soon as the first task with that asset as an outlet finishes, even if there are downstream producer tasks that also operate on the asset.
  • Consumer DAGs scheduled on multiple assets run as soon as their expression is fulfilled by at least one asset event per asset in the expression. This means that it does not matter to the consuming DAG whether an asset received additional updates in the meantime, it consumes all queued events for one asset as one input. See Multiple Assets for more information.
  • A consumer DAG that is paused will ignore all updates to assets that occurred while it was paused. Meaning, it starts with a blank slate upon being unpaused.
  • DAGs that are triggered by assets do not have the concept of a data interval. If you need information about the triggering event in your downstream DAG, you can use the parameter triggering_asset_events from the context. This parameter provides a list of all the triggering asset events with the parameters [timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ]. See Retrieving information in a downstream task for an example.

Conditional asset scheduling

You can use logical operators to combine any number of assets provided to the schedule parameter. The logical operators supported are | for OR and & for AND.

For example, to schedule a DAG on an update to either asset1, asset2, asset3, or asset4, you can use the following syntax. Note that the full statement is wrapped in ().

from airflow.sdk import Asset, dag

@dag(
schedule=(
Asset("asset1")
| Asset("asset2")
| Asset("asset3")
| Asset("asset4")
), # Use () instead of [] to be able to use conditional asset scheduling!
)
def downstream1_on_any():

# your tasks here

downstream1_on_any()

The downstream1_on_any DAG is triggered whenever any of the assets asset1, asset2, asset3, or asset4 are updated. When clicking on x of 4 Assets updated in the DAGs view, you can see the asset expression that defines the schedule.

Screenshot of the Airflow UI with a pop up showing the asset expression for the downstream1_on_any DAG listing the 4 assets under "any"

You can also combine the logical operators to create more complex expressions. For example, to schedule a DAG on an update to either asset1 or asset2 and either asset3 or asset4, you can use the following syntax:

from airflow.sdk import Asset, dag

@dag(
schedule=(
(Asset("asset1") | Asset("asset2"))
& (Asset("asset3") | Asset("asset4"))
), # Use () instead of [] to be able to use conditional asset scheduling!
)
def downstream2_one_in_each_group():

# your tasks here

downstream2_one_in_each_group()

Combined asset and time-based scheduling

You can combine asset-based scheduling with time-based scheduling with the AssetOrTimeSchedule timetable. A DAG scheduled with this timetable will run either when its timetable condition is met or when its asset condition is met.

The DAG shown below runs on a time-based schedule defined by the 0 0 * * * cron expression, which is every day at midnight. The DAG also runs when either asset3 or asset4 is updated.

from airflow.sdk import Asset, dag, task
from pendulum import datetime
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
start_date=datetime(2025, 3, 1),
schedule=AssetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
assets=(Asset("asset3") | Asset("asset4")),
# Use () instead of [] to be able to use conditional asset scheduling!
)
)
def toy_downstream3_asset_and_time_schedule():

# your tasks here

toy_downstream3_asset_and_time_schedule()

Example implementation

In the following example, the write_instructions_to_file and write_info_to_file are both producer tasks because they have defined outlets.

from airflow.sdk import Asset, dag, task

API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")


@dag
def assets_producer_dag():
@task
def get_cocktail(api):
import requests

r = requests.get(api)
return r.json()

@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"

f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()

@task(outlets=[INFO])
def write_info_to_file(response):
import time

time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()

cocktail = get_cocktail(api=API)

write_instructions_to_file(cocktail)
write_info_to_file(cocktail)


assets_producer_dag()

A consumer DAG runs whenever the asset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS and INFO assets are updated, you define the DAG's schedule using the names of those two assets.

Any DAG that is scheduled with an asset is considered a consumer DAG even if that DAG doesn't actually access the referenced asset. In other words, it's up to you as the DAG author to correctly reference and use assets.

from airflow.sdk import Asset, dag, task

INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")


@dag(schedule=[INSTRUCTIONS, INFO]) # Scheduled on both assets
def assets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)

return [item for sublist in cocktail for item in sublist]

read_about_cocktail()


assets_consumer_dag()

Was this page helpful?