Skip to main content

Airflow sensors

Apache Airflow sensors are a special kind of operator that are designed to wait for something to happen. When sensors run, they check to see if a certain condition is met before they are marked successful and let their downstream tasks execute. When used properly, they can be a great tool for making your DAGs more event driven.

In this guide, you'll learn how sensors are used in Airflow, best practices for implementing sensors in production, and how to use deferrable versions of sensors.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Sensor basics

Sensors are a type of operator that checks if a condition is met at a specific interval. If the condition is met, the task is marked successful and the DAG can move to downstream tasks. If the condition isn't met, the sensor waits for another interval before checking again.

All sensors inherit from the BaseSensorOperator and have the following parameters:

  • mode: How the sensor operates. There are two types of modes:
    • poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor.
    • reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.
  • poke_interval: When using poke mode, this is the time in seconds that the sensor waits before checking the condition again. The default is 60 seconds.
  • exponential_backoff: When set to True, this setting creates exponentially longer wait times between pokes in poke mode.
  • timeout: The maximum amount of time in seconds that the sensor checks the condition. If the condition is not met within the specified period, the task fails.
  • soft_fail: If set to True, the task is marked as skipped if the condition is not met by the timeout.

Different types of sensors have different implementation details.

Commonly used sensors

Many Airflow provider packages contain sensors that wait for various criteria in different source systems. The following are some of the most commonly used sensors:

  • @task.sensor decorator: Allows you to turn any Python function that returns a PokeReturnValue into an instance of the BaseSensorOperator class. This way of creating a sensor is useful when checking for complex logic or if you are connecting to a tool via an API that has no specific sensor available.
  • S3KeySensor: Waits for a key (file) to appear in an Amazon S3 bucket. This sensor is useful if you want your DAG to process files from Amazon S3 as they arrive.
  • DateTimeSensor: Waits for a specified date and time. This sensor is useful if you want different tasks within the same DAG to run at different times.
  • ExternalTaskSensor: Waits for an Airflow task to be completed. This sensor is useful if you want to implement cross-DAG dependencies in the same Airflow environment.
  • HttpSensor: Waits for an API to be available. This sensor is useful if you want to ensure your API requests are successful.
  • SqlSensor: Waits for data to be present in a SQL table. This sensor is useful if you want your DAG to process data as it arrives in your database.

To review the available Airflow sensors, go to the Astronomer Registry.

Example implementation

The following example DAG shows how you might use the SqlSensor sensor:

from airflow.decorators import task, dag
from airflow.providers.common.sql.sensors.sql import SqlSensor

from typing import Dict
from pendulum import datetime


def _success_criteria(record):
return record


def _failure_criteria(record):
return True if not record else False


@dag(
description="DAG in charge of processing partner data",
start_date=datetime(2021, 1, 1),
schedule="@daily",
catchup=False,
)
def partner():
waiting_for_partner = SqlSensor(
task_id="waiting_for_partner",
conn_id="postgres",
sql="sql/CHECK_PARTNER.sql",
parameters={"name": "partner_a"},
success=_success_criteria,
failure=_failure_criteria,
fail_on_empty=False,
poke_interval=20,
mode="reschedule",
timeout=60 * 5,
)

@task
def validation() -> Dict[str, str]:
return {"partner_name": "partner_a", "partner_validation": True}

@task
def storing():
print("storing")

waiting_for_partner >> validation() >> storing()


partner()

This DAG waits for data to be available in a Postgres database before running validation and storing tasks. The SqlSensor runs a SQL query and is marked successful when that query returns data. Specifically, when the result is not in the set (0, '0', '', None). The SqlSensor task in the example DAG (waiting_for_partner) runs the CHECK_PARTNER.sql script every 20 seconds (the poke_interval) until the data is returned. The mode is set to reschedule, meaning between each 20 second interval the task will not take a worker slot. The timeout is set to 5 minutes, and the task fails if the data doesn't arrive within that time. When the SqlSensor criteria is met, the DAG moves to the downstream tasks. You can find the full code for this example in the webinar-sensors repo.

Sensor decorator / PythonSensor

If no sensor exists for your use case, you can create your own using either the @task.sensor decorator or the PythonSensor. The @task.sensor decorator returns a PokeReturnValue as an instance of the BaseSensorOperator. The PythonSensor takes a python_callable that returns True or False.

The following DAG shows how to use either the sensor decorator or the PythonSensor to create the same custom sensor:

"""
### Create a custom sensor using the @task.sensor decorator

This DAG showcases how to create a custom sensor using the @task.sensor decorator
to check the availability of an API.
"""

from airflow.decorators import dag, task
from pendulum import datetime
import requests

# importing the PokeReturnValue
from airflow.sensors.base import PokeReturnValue


@dag(start_date=datetime(2022, 12, 1), schedule="@daily", catchup=False)
def sensor_decorator():
# supply inputs to the BaseSensorOperator parameters in the decorator
@task.sensor(poke_interval=30, timeout=3600, mode="poke")
def check_shibe_availability() -> PokeReturnValue:
r = requests.get("http://shibe.online/api/shibes?count=1&urls=true")
print(r.status_code)

# set the condition to True if the API response was 200
if r.status_code == 200:
condition_met = True
operator_return_value = r.json()
else:
condition_met = False
operator_return_value = None
print(f"Shibe URL returned the status code {r.status_code}")

# the function has to return a PokeReturnValue
# if is_done = True the sensor will exit successfully, if
# is_done=False, the sensor will either poke or be rescheduled
return PokeReturnValue(is_done=condition_met, xcom_value=operator_return_value)

# print the URL to the picture
@task
def print_shibe_picture_url(url):
print(url)

print_shibe_picture_url(check_shibe_availability())


sensor_decorator()

Here, the @task.sensor decorates the check_shibe_availability() function, which checks if a given API returns a 200 status code. If the API returns a 200 status code, the sensor task is marked as successful. If any other status code is returned, the sensor pokes again after the poke_interval has passed.

The optional xcom_value parameter in PokeReturnValue defines what data will be pushed to XCom once is_done=true. You can use the data that was pushed to XCom in any downstream tasks.

Sensor best practices

When using sensors, keep the following in mind to avoid potential performance issues:

  • Always define a meaningful timeout parameter for your sensor. The default for this parameter is seven days, which is a long time for your sensor to be running. When you implement a sensor, consider your use case and how long you expect the sensor to wait and then define the sensor's timeout accurately.
  • Whenever possible and especially for long-running sensors, use the reschedule mode so your sensor is not constantly occupying a worker slot. This helps avoid deadlocks in Airflow where sensors take all of the available worker slots.
  • If your poke_interval is very short (less than about 5 minutes), use the poke mode. Using reschedule mode in this case can overload your scheduler.
  • Define a meaningful poke_interval based on your use case. There is no need for a task to check a condition every 60 seconds (the default) if you know the total amount of wait time will be 30 minutes.

Deferrable operators

Deferrable operators (sometimes referred to as asynchronous operators) eliminate the problem of having any operator or sensor using a full worker slot for the entire time they run. Many operators have a deferrable parameter that can be set to True to make the operator deferrable. For the sensors where this parameter is not available, deferrable versions exist in open source Airflow and in the Astronomer Providers package. Astronomer recommends using these in most cases to reduce resource costs.

For DAG authors, using deferrable sensors is no different from using regular sensors. All you need is to do is run a triggerer process in Airflow and either:

  • Set the Airflow config operators.default_deferrable to True to set all sensors with a deferrable parameter to be deferrable by default.
  • Set the deferrable parameter to True on individual sensor instances you want to run in deferrable mode.
  • Replace the name of a sensor with its deferrable counterpart if no deferrable parameter is available.

For more details, see Deferrable operators.

Was this page helpful?

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.