Advanced asset-based scheduling in Apache Airflow®

With Assets, Dags that access the same data can have explicit, visible relationships, and Dags can be scheduled based on updates to these assets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.

The basics of asset-based scheduling, including fundamental concepts and terminology, are covered in Basic asset-based scheduling in Apache Airflow®. This guide covers advanced asset-based scheduling concepts.

In this guide, you’ll learn:

  • How to use conditional asset scheduling to schedule a Dag based on an asset expression.
  • How to use combined asset and time-based scheduling to schedule a Dag based on both a time-based schedule (cron or any other Timetable) plus whenever an asset expression is fulfilled.
  • How to use partitioned asset schedules.
  • How to attach extra information to, and retrieve extra information from, asset events.
  • How to use asset aliases to create dynamic asset schedules.
  • How to use asset listeners to run code when certain asset events occur anywhere in your Airflow instance.

Assets can be used to schedule a Dag based on messages in a message queue. This sub-type of data-aware scheduling is called event-driven scheduling. See Schedule Dags based on Events in a Message Queue for more information.

Assets are a fundamental scheduling paradigm in Airflow. To learn more about when to use assets vs other scheduling paradigms, check out the free Apache Airflow® orchestration paradigms ebook.

This guide covers data-aware schedules with Assets, for more information on the @asset decorator shorthand see @asset syntax in Apache Airflow®.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

Advanced asset concepts

When using asset-based scheduling, updates are produced to assets, creating asset events. These updates can be created by different methods, the most common one being to set the outlets parameter of a task to a list of assets to update upon successful completion. Dags can be scheduled based on asset events created for one or more assets, and tasks can be given access to all events attached to an asset by defining the asset as one of their inlets.

In addition to these fundamental concepts covered in the Basic asset-based scheduling in Apache Airflow® guide, when using advanced techniques for asset-based scheduling, you should understand the following terms:

  • Asset expression: a logical expression using AND (&) and OR (|) operators to define the schedule of a Dag scheduled on updates to several assets.
  • AssetAlias: an object that can be associated with one or more assets and used to create schedules based on assets created at runtime, see Asset aliases.
  • Metadata: a class to attach extra information to an asset event from within the producer task. This functionality can be used to pass asset event-related metadata between tasks, see Attach extra information and Retrieve extra information.
  • AssetWatcher: a class that is used in event-driven scheduling to watch for a TriggerEvent caused by a message in a message queue.
  • Queued asset event: It is common to have Dags scheduled to run as soon as a set of assets have received at least one update each. While there are still asset events missing to trigger the Dag, all asset events for other assets the Dag is scheduled on are queued asset events. A queued asset event is defined by its asset, timestamp and the Dag it is queuing for. One asset event can create a queued asset event for several Dags. You can access queued asset events for a specific Dag or a specific asset programmatically, using the Airflow REST API.

The Assets tab in the Airflow UI provides a list of active assets in your Airflow environment with an asset graph for each asset showing its dependencies to Dags and other assets, see Asset graph for more information.

Airflow is only aware of updates to assets that occur by tasks, API calls, or in the Airflow UI, see Methods to update an asset. It does not monitor updates to assets that occur outside of Airflow. For example, Airflow will not notice if you manually add a file to an S3 bucket referenced by an asset. See When not to use Airflow assets for more information.

Assets events are only registered by Dags or listeners in the same Airflow environment. If you want to create cross-Deployment dependencies with Assets you will need to use the Airflow REST API to create an asset event in the Airflow environment where your downstream Dag is located. See the Cross-deployment dependencies for an example implementation on Astro.

Conditional asset scheduling

When using basic asset-based scheduling, you can schedule a Dag based on one or more assets by providing a list of assets to the schedule parameter. If you provide more than one asset, the Dag will run when all of the assets have received at least one update each.

For more complex scheduling needs, you can use conditional asset scheduling to schedule a Dag based on an asset expression.

An asset expression is a logical expression using AND (&) and OR (|) operators to define the schedule of a Dag scheduled on updates to several assets. The asset expression is given to the schedule parameter, wrapped in ().

For example, to schedule a Dag on an update to either asset1, asset2, asset3, or asset4, you can use the following syntax.

Taskflow
1from airflow.sdk import Asset, dag
2
3@dag(
4 schedule=(
5 Asset("asset1")
6 | Asset("asset2")
7 | Asset("asset3")
8 | Asset("asset4")
9 ), # Use () instead of [] to be able to use conditional asset scheduling!
10)
11def downstream1_on_any():
12
13 # your tasks here
14
15downstream1_on_any()
Traditional
1from airflow.sdk import Asset, DAG
2
3with DAG(
4 dag_id="downstream1_on_any",
5 schedule=(
6 Asset("asset1")
7 | Asset("asset2")
8 | Asset("asset3")
9 | Asset("asset4")
10 ), # Use () instead of [] to be able to use conditional asset scheduling!
11):
12
13 # your tasks here

The downstream1_on_any Dag is triggered whenever any of the assets asset1, asset2, asset3, or asset4 are updated. The schedule of the Dag is listed as x of 4 Assets updated in the Airflow UI, you can see the asset expression that defines the schedule as a pop up when clicking on the schedule.

Screenshot of the Airflow UI with a pop up showing the asset expression for the downstream1_on_any Dag listing the 4 assets under "any"

You can also combine the logical operators to create more complex expressions. For example, to schedule a Dag on an update to either asset1 or asset2 and either asset3 or asset4, you can use the following syntax:

Taskflow
1from airflow.sdk import Asset, dag
2
3@dag(
4 schedule=(
5 (Asset("asset1") | Asset("asset2"))
6 & (Asset("asset3") | Asset("asset4"))
7 ), # Use () instead of [] to be able to use conditional asset scheduling!
8)
9def downstream2_one_in_each_group():
10
11 # your tasks here
12
13downstream2_one_in_each_group()
Traditional
1from airflow.sdk import Asset, DAG
2
3with DAG(
4 dag_id="downstream2_one_in_each_group",
5 schedule=(
6 (Asset("asset1") | Asset("asset2"))
7 & (Asset("asset3") | Asset("asset4"))
8 ), # Use () instead of [] to be able to use conditional asset scheduling!
9):
10
11 # your tasks here

Combined asset and time-based scheduling

You can combine asset-based scheduling with time-based scheduling with the AssetOrTimeSchedule timetable. A Dag scheduled with this timetable will run either when its timetable condition is met or when its asset condition is met. You can use any Timetable in the timetable parameter.

The Dag shown below runs on a time-based schedule defined by the 0 0 * * * cron expression, which is every day at midnight. The Dag also runs when either asset3 or asset4 is updated.

Taskflow
1from airflow.sdk import Asset, dag, task
2from pendulum import datetime
3from airflow.timetables.assets import AssetOrTimeSchedule
4from airflow.timetables.trigger import CronTriggerTimetable
5
6@dag(
7 start_date=datetime(2025, 3, 1),
8 schedule=AssetOrTimeSchedule(
9 timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
10 assets=(Asset("asset3") | Asset("asset4")),
11 # Use () instead of [] to be able to use conditional asset scheduling!
12 )
13)
14def toy_downstream3_asset_and_time_schedule():
15
16 # your tasks here
17
18toy_downstream3_asset_and_time_schedule()
Traditional
1from airflow.sdk import Asset, DAG
2from pendulum import datetime
3from airflow.timetables.assets import AssetOrTimeSchedule
4from airflow.timetables.trigger import CronTriggerTimetable
5
6with DAG(
7 dag_id="toy_downstream3_asset_and_time_schedule",
8 start_date=datetime(2024, 3, 1),
9 schedule=AssetOrTimeSchedule(
10 timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
11 assets=(Asset("asset3") | Asset("asset4")),
12 # Use () instead of [] to be able to use conditional asset scheduling!
13 )
14):
15 # your tasks here

Partitioned asset schedules

Airflow 3.2 introduced the concept of partitioned Dag runs and partitioned asset events, which have a partition_key attached to them. The partition key can be used in tasks in a partitioned Dag run to partition data, for example in a SQL statement.

To schedule a Dag based on partitioned asset events, you set its schedule parameter to an instance of PartitionedAssetTimetable.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset
2
3@dag(schedule=PartitionedAssetTimetable(assets=Asset("my_partitioned_asset")))

This Dag will only be triggered when the my_partitioned_asset is updated by a partitioned asset event, not by regular asset events. You can modify the partition key by providing a partition_key_mapper to the PartitionedAssetTimetable instance, for example to change the time grain of the partition key to daily or weekly.

There are three ways to create a partitioned asset event:

  • By updating an asset manually in the Airflow UI and providing a partition_key in the asset event creation dialog.
  • By updating an asset using the Airflow REST API and providing a partition_key in the request body.
  • By updating an asset using the outlets parameter of a task in a Dag that is scheduled using a CronPartitionTimetable timetable.

For more information on partitioned Dag runs and asset events, see the Partitioned Dag runs and asset events in Apache Airflow® guide.

Partitioned asset events created by a task in a Dag using the CronPartitionTimetable timetable are intended for partition-aware downstream scheduling, and do not trigger non-partition-aware Dags.

Asset event extras

You can attach extra information to an asset event and retrieve it in downstream tasks. This is useful for passing metadata between tasks, for example information about the asset you are working with.

Attach extra information

When updating an asset in the Airflow UI or making a POST request to the Airflow REST API, you can attach extra information to the asset event by providing an extra json payload.

You can add extra information from within the producing task using either the Metadata class or accessing outlet_events from the Airflow context. The information passed needs to be json serializable.

To use the Metadata class to attach information to an asset, follow the example in the code snippet below. Make sure that the asset used in the metadata class is also defined as an outlet in the producer task.

Taskflow
1# from airflow.sdk import Asset, Metadata, task
2
3my_asset_1 = Asset("x-asset1")
4
5@task(outlets=[my_asset_1])
6def attach_extra_using_metadata():
7 num = 23
8 yield Metadata(my_asset_1, {"myNum": num})
9
10 return "hello :)"
11
12attach_extra_using_metadata()
Traditional
1# from airflow.providers.standard.operators.python import PythonOperator
2# from airflow.sdk import Asset, Metadata
3
4my_asset_1 = Asset("x-asset1")
5
6def attach_extra_using_metadata_func():
7 num = 23
8 yield Metadata(my_asset_1, {"myNum": num})
9
10 return "hello :)"
11
12attach_extra_using_metadata = PythonOperator(
13 task_id="attach_extra_using_metadata",
14 python_callable=my_function,
15 outlets=[my_asset_1]
16)

You can also access the outlet_events from the Airflow context directly to add an extra dictionary to an asset event.

Taskflow
1from airflow.sdk import Asset, Metadata, task
2
3my_asset_2 = Asset("x-asset2")
4
5@task(outlets=[my_asset_2])
6def use_outlet_events(outlet_events): # outlet_events is pulled out of the Context
7 num = 19
8 outlet_events[my_asset_2].extra = {"my_num": num}
9
10 return "hello :)"
11
12use_outlet_events()
Traditional
1# from airflow.providers.standard.operators.python import PythonOperator
2# from airflow.sdk import Asset, Metadata
3
4my_asset_2 = Asset("x-asset2")
5
6def attach_extra_using_metadata_func():
7 num = 19
8 context["outlet_events"][my_asset_2].extra = {"my_num": num}
9
10 return "hello :)"
11
12attach_extra_using_metadata = PythonOperator(
13 task_id="attach_extra_using_metadata",
14 python_callable=my_function,
15 outlets=[my_asset_2]
16)

Asset extras can be viewed in the Airflow UI in the asset graph of an asset.

Screenshot of the asset extra information.

Retrieve extra information

Extras attached to asset events can be programmatically retrieved from within Airflow tasks. Any Airflow task instance in a Dag run has access to the list of assets that were involved in triggering that specific Dag run. Additionally, you can give any Airflow task access to all asset events of a specific asset by providing the asset to the task’s inlets parameter. Defining inlets does not affect the schedule of the Dag.

To access all asset events that were involved in triggering a Dag run within a TaskFlow API task, you can pull triggering_asset_events from the Airflow context. In a traditional operator, you can use Jinja templating in any templateable field of the operator to access information in the Airflow context.

Taskflow
1# from airflow.sdk import task
2
3@task
4def get_extra_triggering_run(triggering_asset_events):
5 # triggering_asset_events - all events that triggered this specific Dag run, and is pulled from the Context
6 # the loop below wont run if the Dag is manually triggered
7 for asset, asset_list in triggering_asset_events.items():
8 print(asset, asset_list)
9 print(asset_list[0].extra)
10 # you can also fetch the run_id and other information about the upstream Dags
11 print(asset_list[0].source_run_id)
Traditional
1# from airflow.operators.bash import BashOperator
2
3get_extra_triggering_run_bash = BashOperator(
4 task_id="get_extra_triggering_run_bash",
5 # This statement errors when there are no triggering events, for example in a manual Dag run!
6 bash_command="echo {{ (triggering_asset_events.values() | first | first).extra }} ",
7)

If you want to access asset extras independently from which asset events triggered a Dag run, you have the option to directly provide an asset to a task as an inlet. In a TaskFlow API task you can fetch the inlet_events from the Airflow context.

1# from airflow.sdk import Asset, task
2
3my_asset_2 = Asset("x-asset2")
4
5# note that my_asset_2 does not need to be part of the Dags schedule
6# you can provide as many inlets as you wish
7@task(inlets=[my_asset_2])
8def get_extra_inlet(inlet_events): # inlet_events is pulled out of the Context
9 # inlet_events are listed earliest to latest by timestamp
10 asset_events = inlet_events[my_asset_2]
11 # protect against the asset not existing
12 if len(asset_events) == 0:
13 print(f"No asset_events for {my_asset_2.uri}")
14 else:
15 # accessing the latest asset event for this asset
16 # if the extra does not exist, return None
17 my_extra = asset_events[-1].extra
18 print(my_extra)
19
20get_extra_inlet()

Asset aliases

You have the option to create asset aliases to schedule Dags based on assets with names generated at runtime. An asset alias is defined by a unique name string and can be used in place of a regular asset in outlets and schedules. Any number of asset events updating different assets can be attached to an asset alias.

There are two ways to add an asset event to an asset alias:

  • Using the Metadata class.
  • Using outlet_events pulled from the Airflow context.

See the code below for examples, note how the name of the asset is determined at runtime inside the producing task.

Metadata
1# from airflow.sdk import Asset, AssetAlias, Metadata, task
2
3my_alias_name = "my_alias"
4
5@task(outlets=[AssetAlias(my_alias_name)])
6def attach_event_to_alias_metadata():
7 bucket_name = "my-bucket" # determined at runtime, for example based on upstream input
8 yield Metadata(
9 asset=Asset(f"updated_{bucket_name}"),
10 extra={"k": "v"}, # extra has to be provided, can be {}
11 alias=AssetAlias(my_alias_name),
12 )
13
14attach_event_to_alias_metadata()
Context
1# from airflow.sdk import Asset, AssetAlias, Metadata, task
2
3my_alias_name = "my_alias"
4
5@task(outlets=[AssetAlias(my_alias_name)])
6def attach_event_to_alias_context(outlet_events): # outlet_events is pulled out of the Context
7 bucket_name = "my-other-bucket" # determined at runtime, for example based on upstream input
8 outlet_events[AssetAlias(my_alias_name)].add(
9 Asset(f"updated_{bucket_name}"), extra={"k": "v"}
10 ) # extra is optional
11
12attach_event_to_alias_context()

In the consuming Dag you can use an asset alias in place of a regular asset.

1from airflow.sdk import AssetAlias, dag
2from airflow.providers.standard.operators.empty import EmptyOperator
3
4my_alias_name = "my_alias"
5
6@dag(schedule=[AssetAlias(my_alias_name)])
7def my_consumer_dag():
8
9 EmptyOperator(task_id="empty_task")
10
11my_consumer_dag()

Once the my_producer_dag containing the attach_event_to_alias_metadata task completes successfully, reparsing of all Dags scheduled on the asset alias my_alias is automatically triggered. This reparsing step attaches the updated_{bucket_name} asset to the my_alias asset alias and the schedule resolves, triggering one run of the my_consumer_dag.

Any further asset event for the updated_{bucket_name} asset will now trigger the my_consumer_dag. If you attach asset events for several assets to the same asset alias, a Dag scheduled on that asset alias will run as soon as any of the assets that were ever attached to the asset alias receive an update.

See Scheduling based on asset aliases for more information and examples of using asset aliases.

To use asset aliases with traditional operators, you need to attach the asset event to the alias inside the operator logic. If you are using operators besides the PythonOperator, you can either do so in a custom operator’s .execute method or by passing a post_execute callable to existing operators (experimental). Use outlet_events when attaching asset events to aliases in traditional or custom operators. Note that for deferrable operators, attaching an asset event to an alias is only supported in the execute_complete or post_execute method.

1def _attach_event_to_alias(context, result): # result = the return value of the execute method
2 # use any logic to determine the URI
3 uri = "s3://my-bucket/my_file.txt"
4 context["outlet_events"][AssetAlias(my_alias_name)].add(Asset(uri))
5
6BashOperator(
7 task_id="t2",
8 bash_command="echo hi",
9 outlets=[AssetAlias(my_alias_name)],
10 post_execute=_attach_event_to_alias, # using the post_execute parameter is experimental
11)
Click to view an example of a custom operator attaching an asset event to an asset alias.
1"""
2### AssetAlias in a custom operator
3"""
4
5from airflow.sdk import Asset, AssetAlias, dag
6from airflow.sdk.bases.operator import BaseOperator
7import logging
8
9t_log = logging.getLogger("airflow.task")
10
11my_alias_name = "my-alias"
12
13
14# custom operator producing to an asset alias
15class MyOperator(BaseOperator):
16 """
17 Simple example operator that attaches an asset event to an asset alias.
18 :param my_bucket_name: (str) The name of the bucket to use in the asset name.
19 """
20
21 # define the .__init__() method that runs when the DAG is parsed
22 def __init__(self, my_bucket_name, my_alias_name, *args, **kwargs):
23 # initialize the parent operator
24 super().__init__(*args, **kwargs)
25 # assign class variables
26 self.my_bucket_name = my_bucket_name
27 self.my_alias_name = my_alias_name
28
29 def execute(self, context):
30
31 # add your custom operator logic here
32
33 # use any logic to derive the dataset URI
34 my_asset_name = f"updated_{self.my_bucket_name}"
35 context["outlet_events"][AssetAlias(self.my_alias_name)].add(
36 Asset(my_asset_name)
37 )
38
39 return "hi :)"
40
41 # define the .post_execute() method that runs after the execute method (optional)
42 # result is the return value of the execute method
43 def post_execute(self, context, result=None):
44 # write to Airflow task logs
45 self.log.info("Post-execution step")
46
47 # It is also possible to add events to the alias in the post_execute method
48
49
50@dag
51def asset_alias_custom_operator():
52
53 MyOperator(
54 task_id="t1",
55 my_bucket_name="my-bucket",
56 my_alias_name=my_alias_name,
57 outlets=[AssetAlias(my_alias_name)],
58 )
59
60
61asset_alias_custom_operator()

Asset listeners

A listener is a type of Airflow plugin that can be used to run custom code when certain events occur anywhere in your Airflow instance. There are four listener hooks relating to asset events:

  • on_asset_created: runs when a new asset is created.
  • on_asset_alias_created: runs when a new asset alias is created.
  • on_asset_changed: runs when any asset change occurs.
  • on_asset_event_emitted: runs when an asset event is emitted, generally called together with on_asset_changed.

To implement a listener, you need to create a @hookimpl-decorated function for your listener hook of choice and then register them in an Airflow plugin.

1from airflow.plugins_manager import AirflowPlugin
2from airflow.listeners.types import AssetEvent
3from airflow.serialization.definitions.assets import SerializedAsset, SerializedAssetAlias
4from airflow.listeners import hookimpl
5
6
7@hookimpl
8def on_asset_created(asset: SerializedAsset):
9 """Execute when a new asset is created."""
10
11
12@hookimpl
13def on_asset_alias_created(asset_alias: SerializedAssetAlias):
14 """Execute when a new asset alias is created."""
15
16
17@hookimpl
18def on_asset_changed(asset: SerializedAsset):
19 """Execute when asset change is registered."""
20
21
22@hookimpl
23def on_asset_event_emitted(asset_event: AssetEvent):
24 """
25 Execute when an asset event is emitted.
26
27 This is generally called together with ``on_asset_changed``, but with
28 information on the emitted event instead.
29 """
30
31
32class MyListenerPlugin(AirflowPlugin):
33 name = "my_listener_plugin"
34
35 listeners = [
36 on_asset_created,
37 on_asset_alias_created,
38 on_asset_changed,
39 on_asset_event_emitted,
40 ]

See the Airflow documentation for more information on listeners.