Event-driven scheduling
Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is triggered when messages are posted to a message queue. This is useful for scenarios where you want to trigger a DAG based on events that occur outside of Airflow, such as data delivery to an external system or IoT sensor events, and is key to inference execution pipelines.
In this guide you will learn about the concepts used in event-driven scheduling, common usage patterns, and how to implement an example using Amazon SQS and an example using Apache Kafka.
Assumed knowledge
To get the most out of this guide, you should have an existing knowledge of:
- Airflow assets. See Assets and data-aware scheduling in Airflow.
Concepts
There are a number of concepts that are important to understand when using event-driven scheduling.
- Data-aware scheduling: Data-aware or data-driven scheduling refers to all ways you can schedule a DAG based on updates to assets. Updates to assets outside of event-driven scheduling occur by tasks in the same Airflow instance completing successfully, manually through the Airflow UI, or through a call to the Airflow REST API. See Assets and data-aware scheduling in Airflow.
- Event-driven scheduling: Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is run based on messages posted to a message queue. This message in the queue is triggered by an event that occurs outside of Airflow.
- Message queue: A message queue is a service that allows you to send and receive messages between different systems. Examples of message queues include Amazon SQS, RabbitMQ, and Apache Kafka. Event-driven scheduling in Airflow 3.0 is supported for Amazon SQS with support for other message queues planned for future releases.
- Trigger: A trigger is an asynchronous Python function running in the Airflow triggerer component. Triggers that inherit from
BaseEventTrigger
can be used in AssetWatchers for event-driven scheduling. The trigger is responsible for polling the message queue for new messages, when a new message is found, aTriggerEvent
is created. The message is deleted from the queue. - AssetWatcher: An AssetWatcher is a class in Airflow that watches one or more triggers for events. When a trigger fires a
TriggerEvent
, the AssetWatcher updates the asset it is associated with, creating anAssetEvent
. The payload of the trigger is attached to theAssetEvent
in itsextra
dictionary. - AssetEvent: An
Asset
is an object in Airflow that represents a concrete or abstract data entity. For example, an asset can be a file, table in a database, or not tied to any specific data. AnAssetEvent
represents one update to an asset. In the context of event-driven scheduling, theAssetEvent
represents one message having been detected in the message queue.
When to use event-driven scheduling
Basic and advanced data-aware scheduling are great for use cases where updates to assets occur within Airflow or can be accomplished via a call to the Airflow REST API. However, there are scenarios where you need a DAG to run based on events in external systems. Two common patterns exist for event-driven scheduling:
-
Data delivery to an external system: Data is delivered to an external system, such as manually by a domain expert, and a data-ready event is sent to a message queue. The DAG in Airflow is scheduled based on this message event and runs an extract-transform-load (ETL) pipeline that processes the data in the external system.
-
IoT sensor events: An Internet of Things (IoT) device sends a sensor event to a message queue. A DAG in Airflow is scheduled based on this message event and consumes the message to evaluate the sensor value. If the evaluation determines that an alert is warranted, an alert event is published to another message queue.
One common use case for event-driven scheduling is inference execution, where the DAG that is triggered involves a call to a machine learning model. Airflow can be used to orchestrate inference execution pipelines of all types, including in Generative AI applications.
A key change enabling inference execution in Airflow 3.0 is that a DAG can be triggered with None
provided as the logical_date
, meaning simultaneous triggering of multiple DAG runs is possible.
Info
Currently, the
MessageQueueTrigger
works with Amazon SQS and Apache Kafka out-of-the-box with support for other message queues planned for future releases. You can create your own triggers to use with AssetWatchers by inheriting from theBaseEventTrigger
class. See the Airflow documentation for more information on supported triggers for event-driven scheduling.
Example: Amazon SQS
This example shows how to configure a DAG to run as soon as a message is posted to an Amazon SQS queue.
-
Create an Amazon SQS queue. See Amazon Simple Queue Service Documentation for instructions.
-
Add the Airflow Common Messaging provider and Airflow Amazon provider to your Airflow instance. When using the Astro CLI, you can add the providers to your
requirements.txt
file: -
Set the connection to your Amazon SQS queue in your Airflow instance. Note that the connection needs to include the
region_name
in theextra
field. Replace<your access key>
,<your secret key>
, and<your region>
with your AWS credentials and region. For other authentication options see the Airflow Amazon provider documentation.Your AWS user requires at least
sqs:ReceiveMessage
andsqs:DeleteMessage
permissions.
-
Create a new file in the
dags
folder of your Airflow project and add the following code. Replace theSQS_QUEUE
with the URL to your message queue.
-
Set the connection to your Apache Kafka topic in your Airflow instance. Below is an example connection json, depending on your Kafka setup you may need to provide additional values in the
extra
dictionary. -
Create a new file in the
dags
folder of your Airflow project and add the following code. Replace theKAFKA_TOPIC
with the name of your Kafka topic.