Partitioned Dag runs and asset events in Apache Airflow®

Airflow 3.2 introduced the concept of partitioned Dag runs and partitioned asset events. A partitioned Dag run is a Dag run with a partition_key attached to it, and a partitioned asset event is an asset event that has a partition_key attached to it. Any string can be a partition key, with time-based partition keys being the most common. Partition keys can be used in tasks in a partitioned Dag run to partition data, for example in a SQL statement.

In this guide, you’ll learn:

  • When to use partitioned Dag runs and asset events.
  • How to create a partitioned Dag run.
  • How to create a partitioned asset event.
  • How to schedule a Dag based on partitioned asset events.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

When to use partitions

Situations in which you should consider using partitioned Dag runs and asset events are:

  • When you want to process data from a specific time period in every Dag run. For example, if you have a Dag that runs once a day and should always process the data from the previous day.
  • When you have a Dag that is scheduled based on asset events, within which you want to process data from a specific time period. You can use the partition_key to partition the data inside of the downstream Dag run. The partition_key propagates from the upstream Dag run to the downstream Dag run, and you can adjust its grain with partition key mappers.
  • When you want to process data for a specific segment in manual or API-triggered Dag runs. For example, if your Dag generates a report for a specific department, you can set a different partition_key for each Dag run in the Trigger Dag run config or the API request body.

Create a partitioned Dag run

A partitioned Dag run is a Dag run with a partition_key attached to it. There are four ways to create a partitioned Dag run:

  • By running a Dag manually in the Airflow UI and attaching a partition_key in the Trigger Dag run config.

    Screenshot of the Airflow UI showing the Trigger Dag run config with the partition key input field.

  • By running a Dag using the Airflow REST API and attaching a partition_key in the request body.

  • By fulfilling the asset schedule condition of a Dag that uses the PartitionedAssetTimetable timetable.

  • By using the CronPartitionTimetable timetable. Note that only Dag runs of the type scheduled and backfill are partitioned, manual runs are not, unless you are providing a partition_key in the Trigger Dag run config.

You can access partition keys from within any task in a partitioned Dag run, see Accessing partition keys for more information.

CronPartitionTimetable

The CronPartitionTimetable is a timetable that creates partitioned Dag runs with an automatic partition_key attached that is based on the run_after timestamp of each scheduled and backfilled Dag run.

1from airflow.sdk import CronPartitionTimetable
2
3@dag(
4 schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
5)

You can offset the partition key by providing the run_offset parameter to the CronPartitionTimetable instance. The offset is relative to the cron expression, for example if your Dag runs once every hour, a run_offset of -12 will offset the partition key by 12 hours. So the Dag run with the run id 2026-03-16T09:00:00 will have a partition key of 2026-03-15T21:00:00.

1@dag(
2 schedule=CronPartitionTimetable("0 * * * *", timezone="UTC", run_offset=-12),
3)

Create a partitioned asset event

A partitioned asset event is an asset event that has a partition_key attached to it. There are three ways to create a partitioned asset event:

  • By updating an asset manually in the Airflow UI and providing a partition_key in the Asset Event creation dialog.

    Screenshot of the Airflow UI showing the Asset Event creation dialog with the partition key input field.

  • By updating an asset using the Airflow REST API and providing a partition_key in the request body.

  • By updating an asset using the outlets parameter of a task in a Dag that is scheduled using a CronPartitionTimetable timetable.

Partitioned asset events created by a task in a Dag using the CronPartitionTimetable timetable are intended for partition-aware downstream scheduling, and do not trigger non-partition-aware Dags.

The example below shows a Dag that is scheduled using a CronPartitionTimetable timetable. In every scheduled or backfilled run of this Dag, successful completion of the my_task_partitioned_upstream task will create a partitioned asset event for the my_partitioned_asset asset. The partition_key will be the run_after timestamp of the Dag run.

1from airflow.sdk import dag, task, Asset, CronPartitionTimetable
2
3
4my_asset = Asset("my_partitioned_asset")
5
6
7@dag(
8 schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
9)
10def my_dag_partitioned_upstream():
11 @task(outlets=[my_asset])
12 def my_task_partitioned_upstream(**context):
13 pass
14
15 my_task_partitioned_upstream()
16
17
18my_dag_partitioned_upstream()

Schedule on a partitioned asset

To schedule a Dag on a partitioned asset, you set its schedule parameter to an instance of PartitionedAssetTimetable.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset
2
3@dag(schedule=PartitionedAssetTimetable(assets=Asset("my_partitioned_asset")))

This Dag will run whenever the my_partitioned_asset is updated by a partitioned asset event. It will not run based on regular asset events produced for the my_partitioned_asset asset.

You can modify the grain of the partition key by providing a partition_key_mapper to the PartitionedAssetTimetable instance. For example, to partition the data by day, you can use the StartOfDayMapper to normalize the partition key to the day in the format YYYY-MM-DD. See Partition key mappers for more information on the available partition key mappers.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset, StartOfDayMapper
2
3my_partitioned_asset = Asset("my_partitioned_asset")
4
5@dag(
6 schedule=PartitionedAssetTimetable(
7 assets=my_partitioned_asset,
8 partition_mapper_config={my_partitioned_asset: StartOfDayMapper()}
9 )
10)
11def my_dag():
12
13 @task
14 def my_task(**context):
15 print(context["dag_run"].partition_key) # will print the partition key in the format `YYYY-MM-DD`
16
17 my_task()
18
19my_dag()

Combined partitioned asset schedules

You can combine multiple assets in a single PartitionedAssetTimetable instance to create a composite asset schedule using the same logical expressions (AND (&) plus OR (|)) as when creating a conditional asset schedule with regular assets.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset
2
3my_combined_asset_one = Asset("my_combined_asset_one")
4my_combined_asset_two = Asset("my_combined_asset_two")
5
6
7@dag(
8 schedule=(
9 PartitionedAssetTimetable(assets=(my_combined_asset_one & my_combined_asset_two))
10 )
11)
12def my_combined_partitioned_asset_dag():
13 @task
14 def my_task(**context):
15 partition_key = context["dag_run"].partition_key
16 print(partition_key)
17
18 my_task()
19
20
21my_combined_partitioned_asset_dag()

If one of the assets is updated with a partitioned asset event, a pending run of this Dag will be created. Pending runs are visible in the Airflow UI by clicking on its schedule.

Screenshot of the Airflow UI showing a pending run of a Dag with a composite asset schedule.

If several assets have been updated with a partitioned asset event of different partition keys, several pending runs are created, each with a different partition key. Each pending run has a visualization of which assets have been updated for that specific partition key and which assets are still pending.

Screenshot of the Airflow UI showing multiple pending runs of a Dag with a composite asset schedule.

A run of this Dag is only triggered when both my_combined_asset_one and my_combined_asset_two are updated with a partitioned asset event sharing the same partition key. Note that queued partitioned asset events for other partition keys are not reset by this.

You can use different partition key mappers for each asset in the PartitionedAssetTimetable, see Partition key mappers.

Partition keys

Partition keys are strings attached to partitioned Dag runs and asset events. You can use them in tasks in a partitioned Dag run to partition data, for example in a SQL statement.

Accessing partition keys

The partition_key can be accessed inside any task in a partitioned Dag run from within the Airflow context or by using Jinja templating.

1# from airflow.sdk import task
2# from airflow.providers.standard.operators.bash import BashOperator
3
4@task
5def print_partition_key(**context):
6 print(context["dag_run"].partition_key)
7
8
9BashOperator(
10 task_id="print_partition_key",
11 bash_command="echo {{ dag_run.partition_key }}",
12)

One of the most common use cases is to use the partition key in a SQL statement to partition the data used in a specific Dag run. For example, in a Dag that runs once a day, you can use the partition key to select the data for the previous day.

1from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
2
3SQLExecuteQueryOperator(
4 task_id="execute_query",
5 conn_id="my_snowflake_conn",
6 sql="""
7 SELECT * FROM my_table
8 WHERE
9 my_timestamp >= DATEADD(day, -1, '{{ dag_run.partition_key }}'::DATE)
10 AND my_timestamp < '{{ dag_run.partition_key }}'::DATE
11 ;""",
12)

Partition key mappers

Partition key mappers are used to modify the partition key of a Dag run. You can use them to change the grain of the partition key, to map composite keys segment by segment, or to validate that keys are in a fixed allow-list. The following partition key mappers are available:

  • IdentityMapper: keeps keys unchanged. Default mapper.
  • Temporal mappers change the grain of the partition key:
    • StartOfHourMapper: normalizes time keys to the hour in the format YYYY-MM-DDTHH, for example the partition key 2026-03-16T09:37:51 is mapped to 2026-03-16T09.
    • StartOfDayMapper: normalizes time keys to the day in the format YYYY-MM-DD (2026-03-16T09:37:51 -> 2026-03-16)
    • StartOfWeekMapper: normalizes time keys to the week in the format YYYY-MM-DD (W%V) (2026-03-16T09:37:51 -> 2026-03-16 (W12))
    • StartOfMonthMapper: normalizes time keys to the month in the format YYYY-MM (2026-03-16T09:37:51 -> 2026-03)
    • StartOfQuarterMapper: normalizes time keys to the quarter in the format YYYY-Q<n> (2026-03-16T09:37:51 -> 2026-Q1). The quarters are based on the calendar year, Q1 starts in January, Q2 in April, Q3 in July, and Q4 in October.
    • StartOfYearMapper: normalizes time keys to the year in the format YYYY (2026-03-16T09:37:51 -> 2026).
  • ProductMapper: maps composite keys segment by segment, applying one mapper per segment and then rejoining the mapped segments. For example, with the key Finance|2026-03-16T09:00:00, ProductMapper(IdentityMapper(), StartOfDayMapper()) produces Finance|2026-03-16. See Composite partition keys.
  • AllowedKeyMapper: validates that keys are in a fixed allow-list and passes the key through unchanged if valid. For example, AllowedKeyMapper(["Marketing", "Finance", "Sales"]) accepts only those department keys and rejects all others.

You can also change the default partition key mapper for all assets in the PartitionedAssetTimetable by providing a default_partition_mapper parameter.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset, StartOfDayMapper
2
3@dag(
4 schedule=PartitionedAssetTimetable(
5 assets=Asset("my_partitioned_asset"),
6 default_partition_mapper=StartOfDayMapper(),
7 )
8)

To override the default partition key mapper for a specific asset, you can set the partition_mapper_config parameter of the PartitionedAssetTimetable instance to a dictionary of asset instances and partition key mappers.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset, StartOfDayMapper, StartOfWeekMapper
2
3@dag(
4 schedule=PartitionedAssetTimetable(
5 assets=Asset("my_partitioned_asset"),
6 default_partition_mapper=StartOfDayMapper(),
7 partition_mapper_config={
8 Asset("my_partitioned_asset"): StartOfWeekMapper(),
9 },
10 )
11)

You can use different partition key mappers for each asset in the PartitionedAssetTimetable.

1from airflow.sdk import dag, PartitionedAssetTimetable, Asset, StartOfQuarterMapper, StartOfWeekMapper
2
3@dag(
4 schedule=PartitionedAssetTimetable(
5 assets=(my_combined_asset_one & my_combined_asset_two),
6 partition_mapper_config={
7 my_combined_asset_one: StartOfQuarterMapper(),
8 my_combined_asset_two: StartOfWeekMapper(),
9 },
10 )
11)

When chaining several Dags with a partitioned asset schedule, the partition key mappers need to be identical for all Dags after the first one in the chain. For example a Dag which uses a StartOfDayMapper will fail the task producing to the next asset in the chain if the next Dag in the chain uses a StartOfWeekMapper.

Composite partition keys

Composite partition keys are partition keys that are composed of multiple segments, separated by | delimiters. For example, the partition key Finance|2026-03-16T09:00:00|Revenue is a composite partition key with three segments: Finance, 2026-03-16T09:00:00, and Revenue.

You can use the ProductMapper partition key mapper to map composite keys segment by segment, applying one mapper per segment and then rejoining the mapped segments. For example, with the key Finance|2026-03-16T09:00:00, ProductMapper(IdentityMapper(), StartOfDayMapper()) produces Finance|2026-03-16.

1from airflow.sdk import dag, task, PartitionedAssetTimetable, Asset, ProductMapper, IdentityMapper, StartOfDayMapper, AllowedKeyMapper
2
3@dag(
4 schedule=PartitionedAssetTimetable(
5 assets=Asset("my_partitioned_asset"),
6 partition_mapper_config={
7 Asset("my_partitioned_asset"): ProductMapper(IdentityMapper(), StartOfDayMapper(), AllowedKeyMapper(["Revenue", "ARR"])),
8 },
9 )
10)
11def my_composite_dag():
12
13 @task
14 def my_task(**context):
15 partition_key = context["dag_run"].partition_key
16 print(partition_key) # prints the partition key in the format `Finance|2026-03-16|Revenue`
17
18 my_task()
19
20
21my_composite_dag()

The given composite partition key needs to match the number of segments in the ProductMapper instance and needs to be valid for all mappers in the ProductMapper instance in order to trigger a Dag run. Invalid composite partition keys cause an error.