Event-driven scheduling
Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is triggered when messages are posted to a message queue. This is useful for scenarios where you want to trigger a DAG based on events that occur outside of Airflow, such as data delivery to an external system or IoT sensor events, and is key to inference execution pipelines.
In this guide you will learn about the concepts used in event-driven scheduling, common usage patterns, and how to implement an example using Amazon SQS and an example using Apache Kafka.
Assumed knowledge
To get the most out of this guide, you should have an existing knowledge of:
- Airflow assets. See Assets and data-aware scheduling in Airflow.
 
Concepts
There are a number of concepts that are important to understand when using event-driven scheduling.
- Data-aware scheduling: Data-aware or data-driven scheduling refers to all ways you can schedule a DAG based on updates to assets. Updates to assets outside of event-driven scheduling occur by tasks in the same Airflow instance completing successfully, manually through the Airflow UI, or through a call to the Airflow REST API. See Assets and data-aware scheduling in Airflow.
 - Event-driven scheduling: Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is run based on messages posted to a message queue. This message in the queue is triggered by an event that occurs outside of Airflow.
 - Message queue: A message queue is a service that allows you to send and receive messages between different systems. Examples of message queues include Amazon SQS, RabbitMQ, and Apache Kafka. Event-driven scheduling in Airflow 3.0 is supported for Amazon SQS with support for other message queues planned for future releases.
 - Trigger: A trigger is an asynchronous Python function running in the Airflow triggerer component. Triggers that inherit from 
BaseEventTriggercan be used in AssetWatchers for event-driven scheduling. The trigger is responsible for polling the message queue for new messages, when a new message is found, aTriggerEventis created. The message is deleted from the queue. - AssetWatcher: An AssetWatcher is a class in Airflow that watches one or more triggers for events. When a trigger fires a 
TriggerEvent, the AssetWatcher updates the asset it is associated with, creating anAssetEvent. The payload of the trigger is attached to theAssetEventin itsextradictionary. - AssetEvent: An 
Assetis an object in Airflow that represents a concrete or abstract data entity. For example, an asset can be a file, table in a database, or not tied to any specific data. AnAssetEventrepresents one update to an asset. In the context of event-driven scheduling, theAssetEventrepresents one message having been detected in the message queue. 
When to use event-driven scheduling
Basic and advanced data-aware scheduling are great for use cases where updates to assets occur within Airflow or can be accomplished via a call to the Airflow REST API. However, there are scenarios where you need a DAG to run based on events in external systems. Two common patterns exist for event-driven scheduling:
- 
Data delivery to an external system: Data is delivered to an external system, such as manually by a domain expert, and a data-ready event is sent to a message queue. The DAG in Airflow is scheduled based on this message event and runs an extract-transform-load (ETL) pipeline that processes the data in the external system.
 - 
IoT sensor events: An Internet of Things (IoT) device sends a sensor event to a message queue. A DAG in Airflow is scheduled based on this message event and consumes the message to evaluate the sensor value. If the evaluation determines that an alert is warranted, an alert event is published to another message queue.
 

One common use case for event-driven scheduling is inference execution, where the DAG that is triggered involves a call to a machine learning model. Airflow can be used to orchestrate inference execution pipelines of all types, including in Generative AI applications.
A key change enabling inference execution in Airflow 3.0 is that a DAG can be triggered with None provided as the logical_date, meaning simultaneous triggering of multiple DAG runs is possible.
Info
Currently, the
MessageQueueTriggerworks with Amazon SQS and Apache Kafka out-of-the-box with support for other message queues planned for future releases. You can create your own triggers to use with AssetWatchers by inheriting from theBaseEventTriggerclass. See the Airflow documentation for more information on supported triggers for event-driven scheduling.
Example: Amazon SQS
This example shows how to configure a DAG to run as soon as a message is posted to an Amazon SQS queue.
- 
Create an Amazon SQS queue. See Amazon Simple Queue Service Documentation for instructions.
 - 
Add the Airflow Common Messaging provider and Airflow Amazon provider to your Airflow instance. When using the Astro CLI, you can add the providers to your
requirements.txtfile: - 
Set the connection to your Amazon SQS queue in your Airflow instance. Note that the connection needs to include the
region_namein theextrafield. Replace<your access key>,<your secret key>, and<your region>with your AWS credentials and region. For other authentication options see the Airflow Amazon provider documentation.Your AWS user requires at least
sqs:ReceiveMessageandsqs:DeleteMessagepermissions. 
- 
Create a new file in the
dagsfolder of your Airflow project and add the following code. Replace theSQS_QUEUEwith the URL to your message queue.This DAG is scheduled to run as soon as the
sqs_queue_assetasset is updated. This asset uses oneAssetWatcherwith the namesqs_watcherthat watches oneMessageQueueTrigger. This trigger is polling for new messages in the provided SQS queue.The
process_messagetask gets the triggering asset events from the Airflow context and prints the message body from the triggering message. Theprocess_messagetask is a placeholder for your own task that processes the message. - 
Create a new message in the SQS queue. It will trigger the DAG to run and the
process_messagetask will print the message body. 
Example: Apache Kafka
To use Apache Kafka as the message queue for event-driven scheduling, follow these steps:
- 
Create an Apache Kafka topic. See the Apache Kafka documentation for instructions.
 - 
Add the Airflow Common Messaging provider and Airflow Apache Kafka provider to your Airflow instance. When using the Astro CLI, you can add the providers to your
requirements.txtfile: - 
Set the connection to your Apache Kafka topic in your Airflow instance. Below is an example connection json, depending on your Kafka setup you may need to provide additional values in the
extradictionary. - 
Create a new file in the
dagsfolder of your Airflow project and add the following code. Replace theKAFKA_TOPICwith the name of your Kafka topic.This DAG is scheduled to run as soon as the
kafka_topic_assetasset is updated. This asset uses oneAssetWatcherwith the namekafka_watcherthat watches oneMessageQueueTrigger. This trigger is polling for new messages in the provided Kafka topic.The
process_messagetask gets the triggering asset events from the Airflow context and prints the event information from the trigger event that includes the message. Theprocess_messagetask is a placeholder for your own task that processes the message. - 
Create a new message in the Kafka topic. It will trigger the DAG to run and the
process_messagetask will print the event information.