Assets and data-aware scheduling in Airflow
With Assets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these assets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.
Assets can help resolve common issues. For example, consider a data engineering team with a DAG that creates an asset and a machine learning team with a DAG that trains a model on the asset. Using assets, the machine learning team's DAG runs only when the data engineering team's DAG has produced an update to the asset.
In this guide, you'll learn:
- When to use assets in Airflow.
- How to use the
@asset
syntax to create data-oriented pipelines. - How to define Airflow tasks as producers of assets.
- How to run DAGs based on basic and advanced asset schedules.
- How to use asset aliases to create dynamic asset schedules.
- How to attach information to, and retrieve information from, asset events.
Assets can be used to schedule a DAG based on messages in a message queue. This sub-type of data-aware scheduling is called event-driven scheduling. See Schedule DAGs based on Events in a Message Queue for more information.
Assets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.
Assumed knowledge
To get the most out of this guide, you should have an existing knowledge of:
- Airflow scheduling concepts. See Schedule DAGs in Airflow.
@asset
syntax
The @asset
decorator is a shorthand to create one DAG with one task that produces an asset. This decorator is used in the asset-oriented approach to writing DAGs which constitutes a mindset shift to put the data asset front and center. Whether to use the asset-oriented or task-oriented approach to writing DAGs is a matter of preferences. DAGs created using the asset-oriented approach are listed like any other DAG in the Airflow UI.
The following code snippet defines a DAG with the DAG ID my_asset
that runs on a @daily
schedule. It contains one task with the task ID my_asset
that, upon successful completion updates an asset with the name my_asset
.
from airflow.sdk import asset
@asset(schedule="@daily")
def my_asset():
# your task logic here
pass
You can schedule assets based on other assets to create data-centric pipelines. Since each @asset
decorator creates one DAG, data needs to be passed via cross-dag XComs. Below you can see the same simple ETL pipeline accomplished using the asset-oriented and the task-oriented approach.
- Asset-oriented approach
- Task-oriented approach
from airflow.sdk import asset
@asset(schedule="@daily")
def extracted_data():
return {"a": 1, "b": 2}
@asset(schedule=extracted_data)
def transformed_data(context):
data = context["ti"].xcom_pull(
dag_id="extracted_data",
task_ids="extracted_data",
key="return_value",
include_prior_dates=True,
)
return {k: v * 2 for k, v in data.items()}
@asset(schedule=transformed_data)
def loaded_data(context):
data = context["task_instance"].xcom_pull(
dag_id="transformed_data",
task_ids="transformed_data",
key="return_value",
include_prior_dates=True,
)
summed_data = sum(data.values())
print(f"Summed data: {summed_data}")
from airflow.sdk import Asset, dag, task
@dag(schedule="@daily")
def extract_dag():
@task(outlets=[Asset("extracted_data")])
def extract_task():
return {"a": 1, "b": 2}
extract_task()
extract_dag()
@dag(schedule=[Asset("extracted_data")])
def transform_dag():
@task(outlets=[Asset("transformed_data")])
def transform_task(**context):
data = context["ti"].xcom_pull(
dag_id="extract_dag",
task_ids="extract_task",
key="return_value",
include_prior_dates=True,
)
return {k: v * 2 for k, v in data.items()}
transform_task()
transform_dag()
@dag(schedule=[Asset("transformed_data")])
def load_dag():
@task
def load_task(**context):
data = context["ti"].xcom_pull(
dag_id="transform_dag",
task_ids="transform_task",
key="return_value",
include_prior_dates=True,
)
summed_data = sum(data.values())
print(f"Summed data: {summed_data}")
load_task()
load_dag()
The code about creates 3 DAGs that depend on each other, each containing one task that updates one dataset:
To update several assets from the same DAG written with the asset-oriented approach, you can use @asset.multi
. The code example below will create one DAG with the DAG ID my_multi_asset
that contains one task called my_multi_asset
which upon successful completion updates two assets with the names asset_a
and asset_b
.
from airflow.sdk import Asset, asset
@asset.multi(schedule="@daily", outlets=[Asset("asset_a"), Asset("asset_b")])
def my_multi_asset():
pass
Asset concepts
Aside from using the @asset
decorator, you can define assets in your task-oriented DAG code as well, and use them to create cross-DAG or even cross-Deployment dependencies representing the flow of concrete or abstract data objects through your DAGs. This section covers definitions for asset terminology, as well as general information on how to use them.
Asset terminology
You can define assets in your DAG code and use them to create cross-DAG dependencies. Airflow uses the following terms related to the assets feature:
- Asset: an object in Airflow that represents a concrete or abstract data entity and is defined by a unique name. Optionally, a URI can be attached to the asset, when it represents a concrete data entity, like a file in object storage or a table in a relational database.
- @asset: a decorator that can be used to write a DAG with the asset-oriented approach, directly declaring the desired asset in Python with less boilerplate code. Using
@asset
creates a DAG with a single task that updates an asset. See @asset syntax for more information. - Asset event: an event that is attached to an asset and created whenever a producer task updates that particular asset. An asset event is defined by being attached to a specific asset plus the timestamp of when a producer task updated the asset. Optionally, an asset event can contain an
extra
dictionary with additional information about the asset or asset event. - Asset schedule: the schedule of a DAG that is triggered as soon as asset events for one or more assets are created. All assets a DAG is scheduled on are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Assets tab.
- Producer task: a task that produces updates to one or more assets provided to its
outlets
parameter, creating asset events when it completes successfully. - Asset expression: a logical expression using AND (
&
) and OR (|
) operators to define the schedule of a DAG scheduled on updates to several assets. - Queued asset event: It is common to have DAGs scheduled to run as soon as a set of assets have received at least one update each. While there are still asset events missing to trigger the DAG, all asset events for other assets the DAG is scheduled on are queued asset events. A queued asset event is defined by its asset, timestamp and the DAG it is queuing for. One asset event can create a queued asset event for several DAGs. You can access queued Asset events for a specific DAG or a specific asset programmatically, using the Airflow REST API.
- AssetAlias: an object that can be associated to one or more assets and used to create schedules based on assets created at runtime, see Use asset aliases. An asset alias is defined by a unique name.
- Metadata: a class to attach
extra
information to an asset from within the producer task. This functionality can be used to pass asset-related metadata between tasks, see Attaching information to an asset event. - AssetWatcher: a class that is used in event-driven scheduling to watch for a
TriggerEvent
caused by a message in a message queue.
Two parameters relating to Airflow assets exist in all Airflow operators and decorators:
- Outlets: a task parameter that contains the list of assets a specific tasks produces updates to, as soon as it completes successfully. All outlets of a task are shown in the DAG graph in the Airflow UI, as well as reflected in the dependency graph of the Assets tab as soon as the DAG code is parsed, i.e. independently of whether or not any asset events have occurred. Note that Airflow is not yet aware of the underlying data. It is up to you to determine which tasks should be considered producer tasks for an asset. As long as a task has an outlet asset, Airflow considers it a producer task even if that task doesn't operate on the referenced asset.
- Inlets: a task parameter that contains the list of assets a specific task has access to, typically to access
extra
information from related asset events. Defining inlets for a task does not affect the schedule of the DAG containing the task and the relationship is not reflected in the Airflow UI.
To summarize, tasks produce updates to assets given to their outlets
parameter, and this action creates asset events. DAGs can be scheduled based on asset events created for one or more assets, and tasks can be given access to all events attached to an asset by defining the asset as one of their inlets
. A asset is defined as an object in the Airflow metadata database as soon as it is referenced in either, the outlets
parameter of a task or the schedule
of a DAG.
Why use Airflow assets?
Assets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:
- Standardize communication between teams. Assets can function like an API to communicate when data in a specific location has been updated and is ready for use.
- Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don't depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates an asset.
- Get better visibility into how your DAGs are connected and how they depend on data. The Assets graphs in the Airflow UI display how assets and DAGs depend on each other and can be used to navigate between them.
- Reduce costs, because assets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
- Create cross-deployment dependencies using the Airflow REST API. Astro customers can use the Cross-deployment dependencies best practices documentation for guidance.
- Create complex data-driven schedules using Conditional Asset Scheduling and Combined Asset and Time-based Scheduling.
- Schedule a DAG based on messages in a message queue with event-driven scheduling.
Using assets
When you work with assets, keep the following considerations in mind:
- Assets events are only registered by DAGs or listeners in the same Airflow environment. If you want to create cross-Deployment dependencies with Assets you will need to use the Airflow REST API to create an asset event in the Airflow environment where your downstream DAG is located. See the Cross-deployment dependencies for an example implementation on Astro.
- Airflow monitors assets only within the context of DAGs and tasks. It does not monitor updates to assets that occur outside of Airflow. I.e. Airflow will not notice if you manually add a file to an S3 bucket referenced by an asset. To create Airflow dependencies based on outside events, use Airflow sensors, deferrable operators or consider using a message queue as an intermediary and implement event-driven scheduling.
- The Assets tab in the Airflow UI provides a list of active assets in your Airflow environment with an asset graph for each asset showing its dependencies to DAGs and other assets.
You can use listeners to enable Airflow to run any code when certain asset events occur anywhere in your Airflow instance. There are two listener hooks for the following events:
- on_asset_created
- on_asset_alias_created
- on_asset_updated
For examples, refer to our Create Airflow listeners tutorial. Note that listeners are an advanced feature of Airflow. They are not isolated from the Airflow components they run in, and can slow down or in come cases take down your Airflow instance. As such, extra care should be taken when writing listeners. Asset Event listeners are an experimental feature.
Asset definition
A asset is defined as an object in the Airflow metadata database as soon as it is referenced in either the outlets
parameter of a task or the schedule
of a DAG. For information on how to create an asset alias, see Use Asset Aliases.
Basic Asset definition
The simplest asset schedule is one DAG scheduled based on updates to one asset which is produced to by one task. In this example we define that the my_producer_task
task in the my_producer_dag
DAG produces updates to the my_asset
asset, creating attached asset events, and schedule the my_consumer_dag
DAG to run once for every asset event created.
First, provide the asset to the outlets parameter of the producer task.
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag, task
@dag
def my_producer_dag():
@task(outlets=[Asset("my_asset")])
def my_producer_task():
pass
my_producer_task()
my_producer_dag()
from airflow.sdk import Asset, DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(dag_id="my_producer_dag"):
def my_function():
pass
my_task = PythonOperator(
task_id="my_producer_task",
python_callable=my_function,
outlets=[Asset("my_asset")]
)
You can see the relationship between the DAG containing the producing task (my_producer_dag
) and the asset in the Asset Graph located in the Assets tab of the Airflow UI.
The graph view of the my_producer_dag
shows the asset as well, if external conditions or all DAG dependencies are selected in the graph options Options.
Next, schedule the my_consumer_dag
to run as soon as a new asset event is produced to the my_asset
asset.
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag
from airflow.providers.standard.operators.empty import EmptyOperator
@dag(
schedule=[Asset("my_asset")],
)
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
from airflow.sdk import Asset, DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_consumer_dag",
schedule=[Asset("my_asset")]
):
EmptyOperator(task_id="empty_task")
You can see the relationship between the DAG containing the producing task (my_producer_dag
), the consuming DAG my_consumer_dag
and the asset in the asset graph located in the Assets tab of the Airflow UI.
When external conditions or all DAG dependencies are selected, the my_consumer_dag
graph shows the asset as well.
After unpausing the my_consumer_dag
, every successful completion of the my_producer_task
task triggers a run of the my_consumer_dag
.
The producing task lists the Asset Events it caused in its details page, including a link to the Triggered Dag Run.
The triggered DAG run of the my_consumer_dag
also lists the asset event, including a link to the source DAG from within which the asset event was created.
Use asset aliases
You have the option to create asset aliases to schedule DAGs based on assets with names generated at runtime. An asset alias is defined by a unique name
string and can be used in place of a regular asset in outlets
and schedules
. Any number of asset events updating different assets can be attached to an asset alias.
There are two ways to add an asset event to an asset alias:
- Using the
Metadata
class. - Using
outlet_events
pulled from the Airflow context.
See the code below for examples, note how the name of the asset is determined at runtime inside the producing task.
- Metadata
- Outlet events
# from airflow.sdk import Asset, AssetAlias, Metadata, task
my_alias_name = "my_alias"
@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_metadata():
bucket_name = "my-bucket" # determined at runtime, for example based on upstream input
yield Metadata(
asset=Asset(f"updated_{bucket_name}"),
extra={"k": "v"}, # extra has to be provided, can be {}
alias=AssetAlias(my_alias_name),
)
attach_event_to_alias_metadata()
# from airflow.sdk import Asset, AssetAlias, Metadata, task
my_alias_name = "my_alias"
@task(outlets=[AssetAlias(my_alias_name)])
def attach_event_to_alias_context(**context):
bucket_name = "my-other-bucket" # determined at runtime, for example based on upstream input
outlet_events = context["outlet_events"]
outlet_events[AssetAlias(my_alias_name)].add(
Asset(f"updated_{bucket_name}"), extra={"k": "v"}
) # extra is optional
attach_event_to_alias_context()
In the consuming DAG you can use an asset alias in place of a regular asset.
from airflow.sdk import AssetAlias, dag
from airflow.providers.standard.operators.empty import EmptyOperator
my_alias_name = "my_alias"
@dag(schedule=[AssetAlias(my_alias_name)])
def my_consumer_dag():
EmptyOperator(task_id="empty_task")
my_consumer_dag()
Once the my_producer_dag
containing the attach_event_to_alias_metadata
task completes successfully, reparsing of all DAGs scheduled on the asset alias my_alias
is automatically triggered. This reparsing step attaches the updated_{bucket_name}
asset to the my_alias
asset alias and the schedule resolves, triggering one run of the my_consumer_dag
.
Any further asset event for the updated_{bucket_name}
asset will now trigger the my_consumer_dag
. If you attach asset events for several assets to the same asset alias, a DAG scheduled on that asset alias will run as soon as any of the assets that were ever attached to the asset alias receive an update.
See Dynamic data events emitting and asset creation through AssetAlias for more information and examples of using asset aliases.
To use Asset Aliases with traditional operators, you need to attach the asset event to the alias inside the operator logic. If you are using operators besides the PythonOperator, you can either do so in a custom operator's .execute
method or by passing a post_execute
callable to existing operators (experimental). Use outlet_events
when attaching asset events to aliases in traditional or custom operators. Note that for deferrable operators, attaching an asset event to an alias is only supported in the execute_complete
or post_execute
method.
def _attach_event_to_alias(context, result): # result = the return value of the execute method
# use any logic to determine the URI
uri = "s3://my-bucket/my_file.txt"
context["outlet_events"][AssetAlias(my_alias_name)].add(Asset(uri))
BashOperator(
task_id="t2",
bash_command="echo hi",
outlets=[AssetAlias(my_alias_name)],
post_execute=_attach_event_to_alias, # using the post_execute parameter is experimental
)
Click to view an example of a custom operator attaching an asset event to an asset alias.
"""
### AssetAlias in a custom operator
"""
from airflow.sdk import Asset, AssetAlias, dag
from airflow.sdk.bases.operator import BaseOperator
import logging
t_log = logging.getLogger("airflow.task")
my_alias_name = "my-alias"
# custom operator producing to an asset alias
class MyOperator(BaseOperator):
"""
Simple example operator that attaches an asset event to an asset alias.
:param my_bucket_name: (str) The name of the bucket to use in the asset name.
"""
# define the .__init__() method that runs when the DAG is parsed
def __init__(self, my_bucket_name, my_alias_name, *args, **kwargs):
# initialize the parent operator
super().__init__(*args, **kwargs)
# assign class variables
self.my_bucket_name = my_bucket_name
self.my_alias_name = my_alias_name
def execute(self, context):
# add your custom operator logic here
# use any logic to derive the dataset URI
my_asset_name = f"updated_{self.my_bucket_name}"
context["outlet_events"][AssetAlias(self.my_alias_name)].add(
Asset(my_asset_name)
)
return "hi :)"
# define the .post_execute() method that runs after the execute method (optional)
# result is the return value of the execute method
def post_execute(self, context, result=None):
# write to Airflow task logs
self.log.info("Post-execution step")
# It is also possible to add events to the alias in the post_execute method
@dag
def asset_alias_custom_operator():
MyOperator(
task_id="t1",
my_bucket_name="my-bucket",
my_alias_name=my_alias_name,
outlets=[AssetAlias(my_alias_name)],
)
asset_alias_custom_operator()
Updating an asset
There are five ways to update an asset:
-
A DAG defined using
@asset
completes successfully. Under the hood,@asset
creates a DAG with one task which produces the asset. -
A task with an outlet parameter that references the asset completes successfully.
-
A
POST
request to the assets endpoint of the Airflow REST API. -
An
AssetWatcher
that listens for aTriggerEvent
caused by a message in a message queue. See event-driven scheduling for more information. -
A manual update in the Airflow UI by using the Create Asset Event button on the asset graph. There are two options when creating an asset event in the UI:
- Materialize: This option runs the full DAG which contains the task that produces the asset event.
- Manual: This option directly creates a new asset event without running any task that would normally produce the asset event. This option is useful for testing or when you want to create an asset event for an asset that is not updated from within a DAG in this Airflow instance.
Attaching information to an asset event
When updating an asset in the Airflow UI or making a POST
request to the Airflow REST API, you can attach extra information to the asset event by providing an extra
json payload. You can add extra information from within the producing task using either the Metadata
class or accessing outlet_events
from the Airflow context. You can attach any information to the extra that was computed within the task, for example information about the asset you are working with.
To use the Metadata
class to attach information to an asset, follow the example in the code snippet below. Make sure that the asset used in the metadata class is also defined as an outlet in the producer task.
- TaskFlow API
- Traditional syntax
# from airflow.sdk import Asset, Metadata, task
my_asset_1 = Asset("x-asset1")
@task(outlets=[my_asset_1])
def attach_extra_using_metadata():
num = 23
yield Metadata(my_asset_1, {"myNum": num})
return "hello :)"
attach_extra_using_metadata()
# from airflow.providers.standard.operators.python import PythonOperator
# from airflow.sdk import Asset, Metadata
my_asset_1 = Asset("x-asset1")
def attach_extra_using_metadata_func():
num = 23
yield Metadata(my_asset_1, {"myNum": num})
return "hello :)"
attach_extra_using_metadata = PythonOperator(
task_id="attach_extra_using_metadata",
python_callable=my_function,
outlets=[my_asset_1]
)
You can also access the outlet_events
from the Airflow context directly to add an extra dictionary to an asset event.
- TaskFlow API
- Traditional syntax
# from airflow.sdk import Asset, Metadata, task
my_asset_2 = Asset("x-asset2")
@task(outlets=[my_asset_2])
def use_outlet_events(**context):
num = 19
context["outlet_events"][my_asset_2].extra = {"my_num": num}
return "hello :)"
use_outlet_events()
# from airflow.providers.standard.operators.python import PythonOperator
# from airflow.sdk import Asset, Metadata
my_asset_2 = Asset("x-asset2")
def attach_extra_using_metadata_func():
num = 19
context["outlet_events"][my_asset_2].extra = {"my_num": num}
return "hello :)"
attach_extra_using_metadata = PythonOperator(
task_id="attach_extra_using_metadata",
python_callable=my_function,
outlets=[my_asset_2]
)
Asset extras can be viewed in the Airflow UI in the asset graph of an asset.
Retrieving asset information in a downstream task
Extras can be programmatically retrieved from within Airflow tasks. Any Airflow task instance in a DAG run has access to the list of assets that were involved in triggering that specific DAG run (triggering_asset_events
). Additionally, you can give any Airflow task access to all asset events of a specific asset by providing the asset to the task's inlets
parameter. Defining inlets does not affect the schedule of the DAG.
To access the all asset events that were involved in triggering a DAG run within a TaskFlow API task, simply pull it from the Airflow context. In a traditional operator, you can use Jinja templating in any templateable field of the operator to pull information from the Airflow context.
- TaskFlow API
- Traditional syntax
# from airflow.sdk import task
@task
def get_extra_triggering_run(**context):
# all events that triggered this specific DAG run
triggering_asset_events = context["triggering_asset_events"]
# the loop below wont run if the DAG is manually triggered
for asset, asset_list in triggering_asset_events.items():
print(asset, asset_list)
print(asset_list[0].extra)
# you can also fetch the run_id and other information about the upstream DAGs
print(asset_list[0].source_run_id)
# from airflow.operators.bash import BashOperator
get_extra_triggering_run_bash = BashOperator(
task_id="get_extra_triggering_run_bash",
# This statement errors when there are no triggering events, for example in a manual DAG run!
bash_command="echo {{ (triggering_asset_events.values() | first | first).extra }} ",
)
If you want to access asset extras independently from which asset events triggered a DAG run, you have the option to directly provide an asset to a task as an inlet. In a TaskFlow API task you can fetch the inlet_events
from the Airflow context.
# from airflow.sdk import Asset, task
my_asset_2 = Asset("x-asset2")
# note that my_asset_2 does not need to be part of the DAGs schedule
# you can provide as many inlets as you wish
@task(inlets=[my_asset_2])
def get_extra_inlet(**context):
# inlet_events are listed earliest to latest by timestamp
asset_events = context["inlet_events"][my_asset_2]
# protect against the asset not existing
if len(asset_events) == 0:
print(f"No asset_events for {my_asset_2.uri}")
else:
# accessing the latest asset event for this asset
# if the extra does not exist, return None
my_extra = asset_events[-1].extra
print(my_extra)
get_extra_inlet()
Note that you can programmatically retrieve information from asset aliases as well, see Fetching information from previously emitted asset events through resolved asset aliases for more information.
Asset schedules
Any number of assets can be provided to the schedule
parameter. There are 3 types of asset schedules:
schedule=[Asset("a"), Asset("b")]
: Providing one or more Assets as a list. The DAG is scheduled to run after all Assets in the list have received at least one update.schedule=(Asset("a") | Asset("b"))
: Using AND (&
) and OR (|
) operators to create a conditional asset expression. Note that asset expressions are enclosed in smooth brackets()
.AssetOrTimeSchedule
: Combining time based scheduling with asset expressions, see combined asset and time-based scheduling.
When scheduling DAGs based on assets, keep the following in mind:
- Consumer DAGs that are scheduled on an asset are triggered every time a task that updates that asset completes successfully. For example, if
task1
andtask2
both produceasset_a
, a consumer DAG ofasset_a
runs twice - first whentask1
completes, and again whentask2
completes. - Consumer DAGs scheduled on an asset are triggered as soon as the first task with that asset as an outlet finishes, even if there are downstream producer tasks that also operate on the asset.
- Consumer DAGs scheduled on multiple assets run as soon as their expression is fulfilled by at least one asset event per asset in the expression. This means that it does not matter to the consuming DAG whether an asset received additional updates in the meantime, it consumes all queued events for one asset as one input. See Multiple Assets for more information.
- A consumer DAG that is paused will ignore all updates to assets that occurred while it was paused. Meaning, it starts with a blank slate upon being unpaused.
- DAGs that are triggered by assets do not have the concept of a data interval. If you need information about the triggering event in your downstream DAG, you can use the parameter
triggering_asset_events
from the context. This parameter provides a list of all the triggering asset events with the parameters[timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ]
. See Retrieving information in a downstream task for an example.
Conditional asset scheduling
You can use logical operators to combine any number of assets provided to the schedule
parameter. The logical operators supported are |
for OR and &
for AND.
For example, to schedule a DAG on an update to either asset1
, asset2
, asset3
, or asset4
, you can use the following syntax. Note that the full statement is wrapped in ()
.
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag
@dag(
schedule=(
Asset("asset1")
| Asset("asset2")
| Asset("asset3")
| Asset("asset4")
), # Use () instead of [] to be able to use conditional asset scheduling!
)
def downstream1_on_any():
# your tasks here
downstream1_on_any()
from airflow.sdk import Asset, DAG
with DAG(
dag_id="downstream1_on_any",
schedule=(
Asset("asset1")
| Asset("asset2")
| Asset("asset3")
| Asset("asset4")
), # Use () instead of [] to be able to use conditional asset scheduling!
):
# your tasks here
The downstream1_on_any
DAG is triggered whenever any of the assets asset1
, asset2
, asset3
, or asset4
are updated. When clicking on x of 4 Assets updated in the DAGs view, you can see the asset expression that defines the schedule.
You can also combine the logical operators to create more complex expressions. For example, to schedule a DAG on an update to either asset1
or asset2
and either asset3
or asset4
, you can use the following syntax:
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag
@dag(
schedule=(
(Asset("asset1") | Asset("asset2"))
& (Asset("asset3") | Asset("asset4"))
), # Use () instead of [] to be able to use conditional asset scheduling!
)
def downstream2_one_in_each_group():
# your tasks here
downstream2_one_in_each_group()
from airflow.sdk import Asset, DAG
with DAG(
dag_id="downstream2_one_in_each_group",
schedule=(
(Asset("asset1") | Asset("asset2"))
& (Asset("asset3") | Asset("asset4"))
), # Use () instead of [] to be able to use conditional asset scheduling!
):
# your tasks here
Combined asset and time-based scheduling
You can combine asset-based scheduling with time-based scheduling with the AssetOrTimeSchedule
timetable. A DAG scheduled with this timetable will run either when its timetable
condition is met or when its asset
condition is met.
The DAG shown below runs on a time-based schedule defined by the 0 0 * * *
cron expression, which is every day at midnight. The DAG also runs when either asset3
or asset4
is updated.
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag, task
from pendulum import datetime
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
start_date=datetime(2025, 3, 1),
schedule=AssetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
assets=(Asset("asset3") | Asset("asset4")),
# Use () instead of [] to be able to use conditional asset scheduling!
)
)
def toy_downstream3_asset_and_time_schedule():
# your tasks here
toy_downstream3_asset_and_time_schedule()
from airflow.sdk import Asset, DAG
from pendulum import datetime
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
with DAG(
dag_id="toy_downstream3_asset_and_time_schedule",
start_date=datetime(2024, 3, 1),
schedule=AssetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
assets=(Asset("asset3") | Asset("asset4")),
# Use () instead of [] to be able to use conditional asset scheduling!
)
):
# your tasks here
Example implementation
In the following example, the write_instructions_to_file
and write_info_to_file
are both producer tasks because they have defined outlets.
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag, task
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")
@dag
def assets_producer_dag():
@task
def get_cocktail(api):
import requests
r = requests.get(api)
return r.json()
@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
@task(outlets=[INFO])
def write_info_to_file(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
cocktail = get_cocktail(api=API)
write_instructions_to_file(cocktail)
write_info_to_file(cocktail)
assets_producer_dag()
from airflow.sdk import Asset, DAG
from airflow.providers.standard.operators.python import PythonOperator
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")
def get_cocktail_func(api):
import requests
r = requests.get(api)
return r.json()
def write_instructions_to_file_func(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
def write_info_to_file_func(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = (
f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
)
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
with DAG(
dag_id="assets_producer_dag",
render_template_as_native_obj=True,
):
get_cocktail = PythonOperator(
task_id="get_cocktail",
python_callable=get_cocktail_func,
op_kwargs={"api": API},
)
write_instructions_to_file = PythonOperator(
task_id="write_instructions_to_file",
python_callable=write_instructions_to_file_func,
op_kwargs={"response": "{{ ti.xcom_pull(task_ids='get_cocktail') }}"},
outlets=[INSTRUCTIONS],
)
write_info_to_file = PythonOperator(
task_id="write_info_to_file",
python_callable=write_info_to_file_func,
op_kwargs={"response": "{{ ti.xcom_pull(task_ids='get_cocktail') }}"},
outlets=[INFO],
)
get_cocktail >> write_instructions_to_file >> write_info_to_file
A consumer DAG runs whenever the asset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS
and INFO
assets are updated, you define the DAG's schedule using the names of those two assets.
Any DAG that is scheduled with an asset is considered a consumer DAG even if that DAG doesn't actually access the referenced asset. In other words, it's up to you as the DAG author to correctly reference and use assets.
- TaskFlow API
- Traditional syntax
from airflow.sdk import Asset, dag, task
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")
@dag(schedule=[INSTRUCTIONS, INFO]) # Scheduled on both assets
def assets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
read_about_cocktail()
assets_consumer_dag()
from airflow.sdk import DAG, Asset
from airflow.providers.standard.operators.python import PythonOperator
INSTRUCTIONS = Asset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Asset("file://localhost/airflow/include/cocktail_info.txt")
def read_about_cocktail_func():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
with DAG(
dag_id="assets_consumer_dag",
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Assets
):
PythonOperator(
task_id="read_about_cocktail",
python_callable=read_about_cocktail_func,
)