Deferrable operators

Deferrable operators leverage the Python asyncio library to efficiently run tasks waiting for an external resource to finish. This frees up your workers and allows you to use resources more effectively. In this guide, you’ll review deferrable operator concepts and learn how to use deferrable operators in your DAGs.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Terms and concepts

Review the following terms and concepts to gain a better understanding of deferrable operator functionality:

  • asyncio: A Python library used as the foundation for multiple asynchronous frameworks. This library is core to deferrable operator functionality, and is used when writing triggers.
  • Triggers: Small, asynchronous sections of Python code. Due to their asynchronous nature, they coexist efficiently in a single process known as the triggerer.
  • Triggerer: An Airflow service similar to a scheduler or a worker that runs an asyncio event loop in your Airflow environment. Running a triggerer is essential for using deferrable operators.
  • Deferred: An Airflow task state indicating that a task has paused its execution, released the worker slot, and submitted a trigger to be picked up by the triggerer process.

The terms deferrable, async, and asynchronous are used interchangeably and have the same meaning.

With traditional operators, a task submits a job to an external system such as a Spark cluster and then polls the job status until it is completed. Although the task isn’t doing significant work, it still occupies a worker slot during the polling process. As worker slots are occupied, tasks are queued and start times are delayed. The following image illustrates this process:

Classic Worker

With deferrable operators, worker slots are released when a task is polling for the job status. When the task is deferred, the polling process is offloaded as a trigger to the triggerer, and the worker slot becomes available. The triggerer can run many asynchronous polling tasks concurrently, and this prevents polling tasks from occupying your worker resources. When the terminal status for the job is received, the operator resumes the task, taking up a worker slot while it finishes. The following image illustrates the process:

Deferrable Worker

In Airflow 2.10+, some deferrable operators directly enter a deferred state without going to a worker first, see Triggering Deferral from Start.

There are numerous benefits to using deferrable operators including:

  • Reduced resource consumption: Depending on the available resources and the workload of your triggers, you can run hundreds to thousands of deferred tasks in a single triggerer process. This can lead to a reduction in the number of workers needed to run tasks during periods of high concurrency. With less workers needed, you are able to scale down the underlying infrastructure of your Airflow environment.
  • Resiliency against restarts: Triggers are stateless by design. This means your deferred tasks are not set to a failure state if a triggerer needs to be restarted due to a deployment or infrastructure issue. When a triggerer is back up and running in your environment, your deferred tasks will resume.

When you can’t use a deferrable operator for a longer running sensor task, such as when you can’t run a triggerer, Astronomer recommends using a sensor in reschedule mode to reduce unnecessary resource overhead. See the Airflow documentation for details about the differences between deferrable operators and schedulers in reschedule mode.

Use deferrable operators

Deferrable operators should be used whenever you have tasks that occupy a worker slot while polling for a condition in an external system. For example, using deferrable operators for sensor tasks can provide efficiency gains and reduce operational costs.

Start a triggerer

To use deferrable operators, you must have a triggerer running in your Airflow environment. If you are running Airflow on Astro or using the Astro CLI, the triggerer runs automatically if you are on Astro Runtime 4.0 and later. If you are using Astronomer Software 0.26 and later, you can add a triggerer to an Airflow 2.2 and later deployment in the Deployment Settings tab. See Configure a Deployment on Astronomer Software - Triggerer to configure the triggerer.

If you are not using Astro, run airflow triggerer to start a triggerer process in your Airflow environment. Your output should look similar to the following image:

Triggerer Logs

As tasks are raised into a deferred state, triggers are registered in the triggerer. You can set the number of concurrent triggers that can run in a single triggerer process with the default_capacity configuration setting in Airflow. This config can also be set with the AIRFLOW__TRIGGERER__DEFAULT_CAPACITY environment variable. The default value is 1000.

Use deferrable versions of operators

Many Airflow operators, such as the TriggerDagRunOperator and the WasbBlobSensor, can be set to run in deferrable mode using the deferrable parameter. You can check if the operator you want to use has a deferrable parameter in the Astronomer Registry.

To always use the deferrable version of an operator if it’s available in Airflow 2.7+, set the Airflow config operators.default_deferrable to True. You can do so by defining the following environment variable in your Airflow environment:

AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=True

After you set the variable, all operators with a deferrable parameter will run as their deferrable version by default. You can override the config setting at the operator level using the deferrable parameter directly:

1trigger_dag_run = TriggerDagRunOperator(
2 task_id="task_in_downstream_dag",
3 trigger_dag_id="downstream_dag",
4 wait_for_completion=True,
5 poke_interval=20,
6 deferrable=False, # turns off deferrable mode just for this operator instance
7)

You can find a list of operators that support deferrable mode in the Airflow documentation.

Previously, before the deferrable parameter was available in regular operators, deferrable operators were implemented as standalone operators, usually with an -Async suffix. Some of these operators are still available. For example, the DateTimeSensor does not have a deferrable parameter, but has a deferrable version called DateTimeSensorAsync.

The Astronomer providers package, which contained many -Async operators, is deprecated. The functionality from most of these operators is integrated into their original operator version in the relevant Airflow provider package.

Example workflow

The following example DAG is scheduled to run every minute between its start_date and its end_date. Every DAG run contains one sensor task that will potentially take up to 20 minutes to complete.

1from airflow.decorators import dag
2from airflow.sensors.date_time import DateTimeSensor
3from pendulum import datetime
4
5
6@dag(
7 start_date=datetime(2024, 5, 23, 20, 0),
8 end_date=datetime(2024, 5, 23, 20, 19),
9 schedule="* * * * *",
10 catchup=True,
11)
12def sync_dag_2():
13 DateTimeSensor(
14 task_id="sync_task",
15 target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=20) }}""",
16 )
17
18
19sync_dag_2()

Using DateTimeSensor, one worker slot is taken up by every sensor that runs. By using the deferrable version of this sensor, DateTimeSensorAsync, you can achieve full concurrency while freeing up your workers to complete additional tasks across your Airflow environment.

In the following screenshot, running the DAG produces 16 running task instances, each containing one active DateTimeSensor taking up one worker slot.

Standard sensor Grid View

Because Airflow imposes default limits on the number of active runs of the same DAG or number of active tasks in a DAG across all runs, you’ll have to scale up Airflow to concurrently run any other DAGs and tasks as described in the Scaling Airflow to optimize performance guide.

Switching out the DateTimeSensor for DateTimeSensorAsync will create 16 running DAG instances, but the tasks for these DAGs are in a deferred state which does not take up a worker slot. The only difference in the DAG code is using the deferrable operator DateTimeSensorAsync over DateTimeSensor:

1from airflow.decorators import dag
2from pendulum import datetime
3from airflow.sensors.date_time import DateTimeSensorAsync
4
5
6@dag(
7 start_date=datetime(2024, 5, 23, 20, 0),
8 end_date=datetime(2024, 5, 23, 20, 19),
9 schedule="* * * * *",
10 catchup=True,
11)
12def async_dag_2():
13 DateTimeSensorAsync(
14 task_id="async_task",
15 target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=20) }}""",
16 )
17
18
19async_dag_2()

In the following screenshot, all tasks are shown in a deferred (violet) state. Tasks in other DAGs can use the available worker slots, making the deferrable operator more cost and time-efficient.

Deferrable sensor Grid View

High availability

Triggers are designed to be highly available. You can implement this by starting multiple triggerer processes. Similar to the HA scheduler, Airflow ensures that they co-exist with correct locking and high availability. See High Availability for more information on this topic.

Create a deferrable operator

If you have an operator that would benefit from being asynchronous, but does not exist in OSS Airflow yet, you can create your own by writing a deferrable operator and trigger class. You can also defer a task several times if needed.

Get a template for a custom deferrable operator and custom trigger class, by clicking the dropdown below. Make sure to adjust the classpath for your trigger’s .serialize method (currently include.deferrable_operator_template.MyTrigger) to match your file structure.

Click to view the template code
1from __future__ import annotations
2import asyncio
3import time
4from asgiref.sync import sync_to_async
5from typing import Any, Sequence, AsyncIterator
6from airflow.configuration import conf
7from airflow.models.baseoperator import BaseOperator
8from airflow.triggers.base import BaseTrigger, TriggerEvent
9from airflow.utils.context import Context
10
11
12class MyTrigger(BaseTrigger):
13 """
14 This is an example of a custom trigger that waits for a binary random choice
15 between 0 and 1 to be 1.
16 Args:
17 poll_interval (int): How many seconds to wait between async polls.
18 my_kwarg_passed_into_the_trigger (str): A kwarg that is passed into the trigger.
19 Returns:
20 my_kwarg_passed_out_of_the_trigger (str): A kwarg that is passed out of the trigger.
21 """
22
23 def __init__(
24 self,
25 poll_interval: int = 60,
26 my_kwarg_passed_into_the_trigger: str = "notset",
27 my_kwarg_passed_out_of_the_trigger: str = "notset",
28 # you can add more arguments here
29 ):
30 super().__init__()
31 self.poll_interval = poll_interval
32 self.my_kwarg_passed_into_the_trigger = my_kwarg_passed_into_the_trigger
33 self.my_kwarg_passed_out_of_the_trigger = my_kwarg_passed_out_of_the_trigger
34
35 def serialize(self) -> tuple[str, dict[str, Any]]:
36 """
37 Serialize MyTrigger arguments and classpath.
38 All arguments must be JSON serializable.
39 This will be returned by the trigger when it is complete and passed as `event` to the
40 `execute_complete` method of the deferrable operator.
41 """
42
43 return (
44 "include.deferrable_operator_template.MyTrigger", # this is the classpath for the Trigger
45 {
46 "poll_interval": self.poll_interval,
47 "my_kwarg_passed_into_the_trigger": self.my_kwarg_passed_into_the_trigger,
48 "my_kwarg_passed_out_of_the_trigger": self.my_kwarg_passed_out_of_the_trigger,
49 # you can add more kwargs here
50 },
51 )
52
53 # The run method is an async generator that yields TriggerEvents when the desired condition is met
54 async def run(self) -> AsyncIterator[TriggerEvent]:
55 while True:
56 result = (
57 await self.my_trigger_function()
58 ) # The my_trigger_function is awaited and where the condition is checked
59 if result == 1:
60 self.log.info(f"Result was 1, thats the number! Triggering event.")
61
62 self.log.info(
63 f"Kwarg passed in was: {self.my_kwarg_passed_into_the_trigger}"
64 )
65 # This is how you pass data out of the trigger, by setting attributes that get serialized
66 self.my_kwarg_passed_out_of_the_trigger = "apple"
67 self.log.info(
68 f"Kwarg to be passed out is: {self.my_kwarg_passed_out_of_the_trigger}"
69 )
70 # Fire the trigger event! This gets a worker to execute the operator's `execute_complete` method
71 yield TriggerEvent(self.serialize())
72 return # The return statement prevents the trigger from running again
73 else:
74 self.log.info(
75 f"Result was not the one we are waiting for. Sleeping for {self.poll_interval} seconds."
76 )
77 # If the condition is not met, the trigger sleeps for the poll_interval
78 # this code can run multiple times until the condition is met
79 await asyncio.sleep(self.poll_interval)
80
81 # This is the function that is awaited in the run method
82 @sync_to_async
83 def my_trigger_function(self) -> str:
84 """
85 This is where what you are waiting for goes For example a call to an
86 API to check for the state of a cloud resource.
87 This code can run multiple times until the condition is met.
88 """
89
90 import random
91
92 randint = random.choice([0, 1])
93 self.log.info(f"Random number: {randint}")
94
95 return randint
96
97
98class MyOperator(BaseOperator):
99 """
100 Deferrable operator that waits for a binary random choice between 0 and 1 to be 1.
101 Args:
102 wait_for_completion (bool): Whether to wait for the trigger to complete.
103 poke_interval (int): How many seconds to wait between polls,
104 both in deferrable or sensor mode.
105 deferrable (bool): Whether to defer the operator. If set to False,
106 the operator will act as a sensor.
107 Returns:
108 str: A kwarg that is passed through the trigger and returned by the operator.
109 """
110
111 template_fields: Sequence[str] = (
112 "wait_for_completion",
113 "poke_interval",
114 )
115 ui_color = "#73deff"
116
117 def __init__(
118 self,
119 *,
120 # you can add more arguments here
121 wait_for_completion: bool = False,
122 poke_interval: int = 60,
123 deferrable: bool = conf.getboolean(
124 "operators", "default_deferrable", fallback=False
125 ), # this default is a convention to be able to set the operator to deferrable in the config
126 # using AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=True
127 **kwargs,
128 ) -> None:
129 super().__init__(**kwargs)
130
131 self.wait_for_completion = wait_for_completion
132 self.poke_interval = poke_interval
133 self._defer = deferrable
134
135 def execute(self, context: Context):
136
137 # Add code you want to be executed before the deferred part here (this code only runs once)
138
139 # turns operator into sensor/deferred operator
140 if self.wait_for_completion:
141 # Starting the deferral process
142 if self._defer:
143 self.log.info(
144 "Operator in deferrable mode. Starting the deferral process."
145 )
146 self.defer(
147 trigger=MyTrigger(
148 poll_interval=self.poke_interval,
149 my_kwarg_passed_into_the_trigger="lemon",
150 # you can pass information into the trigger here
151 ),
152 method_name="execute_complete",
153 kwargs={"kwarg_passed_to_execute_complete": "tomato"},
154 # kwargs get passed through to the execute_complete method
155 )
156 else: # regular sensor part
157 while True:
158 self.log.info("Operator in sensor mode. Polling.")
159 time.sleep(self.poke_interval)
160 import random
161
162 # This is where you would check for the condition you are waiting for
163 # when using the operator as a regular sensor
164 # This code can run multiple times until the condition is met
165
166 randint = random.choice([0, 1])
167
168 self.log.info(f"Random number: {randint}")
169 if randint == 1:
170 self.log.info("Result was 1, thats the number! Continuing.")
171 return randint
172 self.log.info(
173 "Result was not the one we are waiting for. Sleeping."
174 )
175 else:
176 self.log.info("Not waiting for completion.")
177
178 # Add code you want to be executed after the deferred part here (this code only runs once)
179 # you can have as many deferred parts as you want in an operator
180
181 def execute_complete(
182 self,
183 context: Context,
184 event: tuple[str, dict[str, Any]],
185 kwarg_passed_to_execute_complete: str, # make sure to add the kwargs you want to pass through
186 ):
187 """Execute when the trigger is complete. This code only runs once."""
188
189 self.log.info("Trigger is complete.")
190 self.log.info(f"Event: {event}") # printing the serialized event
191
192 # you can push additional data to XCom here
193 context["ti"].xcom_push(
194 "message_from_the_trigger", event[1]["my_kwarg_passed_out_of_the_trigger"]
195 )
196
197 return kwarg_passed_to_execute_complete # the returned value gets pushed to XCom as `return_value`

Note that when developing a custom trigger, you need to restart your triggerer to pick up any changes you make, since the triggerer caches the trigger classes. Additionally, all information you pass between the triggerer and the worker must be JSON serializable.

See Writing Deferrable Operators for more information.

Airflow 2.10 includes the option to implement direct deferral without ever being picked up by a worker. See the code below for a deferrable operator circumventing the .execute() method. When using this template, make sure to adjust the classpath for your trigger (currently include.deferrable_operator_template.MyTrigger) in both the .serialize method and the StartTriggerArgs to match your file structure.

Click to view the template code
1from __future__ import annotations
2import asyncio
3import time
4from asgiref.sync import sync_to_async
5from typing import Any, Sequence, AsyncIterator
6from airflow.configuration import conf
7from airflow.models.baseoperator import BaseOperator
8from airflow.triggers.base import BaseTrigger, TriggerEvent
9from airflow.utils.context import Context
10from airflow.triggers.base import StartTriggerArgs
11
12
13class MyTrigger(BaseTrigger):
14 """
15 This is an example of a custom trigger that waits for a binary random choice
16 between 0 and 1 to be 1.
17 Args:
18 poll_interval (int): How many seconds to wait between async polls.
19 my_kwarg_passed_into_the_trigger (str): A kwarg that is passed into the trigger.
20 Returns:
21 my_kwarg_passed_out_of_the_trigger (str): A kwarg that is passed out of the trigger.
22 """
23
24 def __init__(
25 self,
26 poll_interval: int = 60,
27 my_kwarg_passed_into_the_trigger: str = "notset",
28 my_kwarg_passed_out_of_the_trigger: str = "notset",
29 # you can add more arguments here
30 ):
31 super().__init__()
32 self.poll_interval = poll_interval
33 self.my_kwarg_passed_into_the_trigger = my_kwarg_passed_into_the_trigger
34 self.my_kwarg_passed_out_of_the_trigger = my_kwarg_passed_out_of_the_trigger
35
36 def serialize(self) -> tuple[str, dict[str, Any]]:
37 """
38 Serialize MyTrigger arguments and classpath.
39 All arguments must be JSON serializable.
40 This will be returned by the trigger when it is complete and passed as `event` to the
41 `execute_complete` method of the deferrable operator.
42 """
43
44 return (
45 "include.custom_deferrable_operator.MyTrigger", # this is the classpath for the Trigger
46 {
47 "poll_interval": self.poll_interval,
48 "my_kwarg_passed_into_the_trigger": self.my_kwarg_passed_into_the_trigger,
49 "my_kwarg_passed_out_of_the_trigger": self.my_kwarg_passed_out_of_the_trigger,
50 # you can add more kwargs here
51 },
52 )
53
54 # The run method is an async generator that yields TriggerEvents when the desired condition is met
55 async def run(self) -> AsyncIterator[TriggerEvent]:
56 while True:
57 result = (
58 await self.my_trigger_function()
59 ) # The my_trigger_function is awaited and where the condition is checked
60 if result == 1:
61 self.log.info(f"Result was 1, thats the number! Triggering event.")
62
63 self.log.info(
64 f"Kwarg passed in was: {self.my_kwarg_passed_into_the_trigger}"
65 )
66 # This is how you pass data out of the trigger, by setting attributes that get serialized
67 self.my_kwarg_passed_out_of_the_trigger = "apple"
68 self.log.info(
69 f"Kwarg to be passed out is: {self.my_kwarg_passed_out_of_the_trigger}"
70 )
71 # Fire the trigger event! This gets a worker to execute the operator's `execute_complete` method
72 yield TriggerEvent(self.serialize())
73 return # The return statement prevents the trigger from running again
74 else:
75 self.log.info(
76 f"Result was not the one we are waiting for. Sleeping for {self.poll_interval} seconds."
77 )
78 # If the condition is not met, the trigger sleeps for the poll_interval
79 # this code can run multiple times until the condition is met
80 await asyncio.sleep(self.poll_interval)
81
82 # This is the function that is awaited in the run method
83 @sync_to_async
84 def my_trigger_function(self) -> str:
85 """
86 This is where what you are waiting for goes For example a call to an
87 API to check for the state of a cloud resource.
88 This code can run multiple times until the condition is met.
89 """
90
91 import random
92
93 randint = random.choice([0, 1])
94 self.log.info(f"Random number: {randint}")
95
96 return randint
97
98
99class MyDeferrableOperator(BaseOperator):
100 """
101 Deferrable operator that waits for a binary random choice between 0 and 1 to be 1.
102 Args:
103 wait_for_completion (bool): Whether to wait for the trigger to complete.
104 poke_interval (int): How many seconds to wait between polls,
105 both in deferrable or sensor mode.
106 deferrable (bool): Whether to defer the operator. If set to False,
107 the operator will act as a sensor.
108 Returns:
109 str: A kwarg that is passed through the trigger and returned by the operator.
110 """
111
112 template_fields: Sequence[str] = (
113 "wait_for_completion",
114 "poke_interval",
115 )
116 ui_color = "#73deff"
117
118 # --------------------------------------------------------- #
119 # New implementation directly starting the trigger - Part 1 #
120 # --------------------------------------------------------- #
121
122 start_trigger_args = StartTriggerArgs(
123 trigger_cls="include.custom_deferrable_operator.MyTrigger",
124 trigger_kwargs={
125 "poll_interval": 60,
126 "my_kwarg_passed_into_the_trigger": "lemon",
127 },
128 next_method="execute_complete",
129 next_kwargs={"kwarg_passed_to_execute_complete": "tomato"},
130 timeout=None,
131 )
132 start_from_trigger = True
133
134 def __init__(
135 self,
136 *,
137 # you can add more arguments here
138 wait_for_completion: bool = False,
139 poke_interval: int = 60,
140 deferrable: bool = conf.getboolean(
141 "operators", "default_deferrable", fallback=False
142 ), # this default is a convention to be able to set the operator to deferrable in the config
143 # using AIRFLOW__OPERATORS__DEFAULT_DEFERRABLE=True
144 **kwargs,
145 ) -> None:
146 super().__init__(**kwargs)
147
148 self.wait_for_completion = wait_for_completion
149 self.poke_interval = poke_interval
150 self._defer = deferrable
151
152 # --------------------------------------------------------- #
153 # New implementation directly starting the trigger - Part 2 #
154 # --------------------------------------------------------- #
155
156 self.start_trigger_args.trigger_kwargs = dict(
157 poll_interval=self.poke_interval,
158 my_kwarg_passed_into_the_trigger="lemon",
159 )
160
161 def execute_complete(
162 self,
163 context: Context,
164 event: tuple[str, dict[str, Any]],
165 kwarg_passed_to_execute_complete: str, # make sure to add the kwargs you want to pass through
166 ):
167 """Execute when the trigger is complete. This code only runs once."""
168
169 self.log.info("Trigger is complete.")
170 self.log.info(f"Event: {event}") # printing the serialized event
171
172 # you can push additional data to XCom here
173 context["ti"].xcom_push(
174 "message_from_the_trigger", event[1]["my_kwarg_passed_out_of_the_trigger"]
175 )
176
177 return kwarg_passed_to_execute_complete # the returned value gets pushed to XCom as `return_value`

See Triggering Deferral from Start for more details and code examples.