Use Apache Kafka with Apache Airflow
Info
This page has not yet been updated for Airflow 3. The concepts shown are relevant, but some code may need to be updated. If you run any examples, take care to update import statements and watch for any other breaking changes.
Apache Kafka is an open source tool for handling event streaming. Combining Kafka and Airflow allows you to build powerful pipelines that integrate streaming data with batch processing. In this tutorial, you’ll learn how to install and use the Kafka Airflow provider to interact directly with Kafka topics.
Warning
While it is possible to manage a Kafka cluster with Airflow, be aware that Airflow itself should not be used for streaming or low-latency processes. See the Best practices section for more information.
Time to complete
This tutorial takes approximately 1 hour to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of Apache Kafka. See the official Introduction to Kafka.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
Quickstart
If you have a GitHub account, you can use the quickstart repository for this tutorial, which automatically starts up Airflow, initiates a local Kafka cluster, and configures all necessary connections. Clone the quickstart repository and then skip to Step 6: Run the DAGs.
Prerequisites
- A Kafka cluster with a topic. This tutorial uses a cluster hosted by Confluent Cloud, which has a free trial option. See the Confluent documentation for how to create a Kafka cluster and topic in Confluent Cloud.
- The Astro CLI.
Info
To connect a local Kafka cluster to an Airflow instance running in Docker, set the following properties in your Kafka cluster’s
server.propertiesfile before starting your Kafka cluster:You can learn more about connecting to local Kafka from within a Docker container in Confluent’s Documentation.
Step 1: Configure your Astro project
- 
Create a new Astro project: 
- 
Add the following packages to your packages.txtfile:
- 
Add the following packages to your requirements.txtfile:
- 
Run the following command to start your project in a local environment: 
Step 2: Create two Kafka connections
The Kafka Airflow provider uses a Kafka connection assigned to the kafka_conn_id parameter of each operator to interact with a Kafka cluster. For this tutorial you define two Kafka connections, because two different consumers will be created.
- 
In your web browser, go to localhost:8080to access the Airflow UI.
- 
Click Admin > Connections > + to create a new connection. 
- 
Name your connection kafka_defaultand select the Apache Kafka connection type. Provide the details for the connection to your Kafka cluster as JSON in the Extra field.If you connect to a local Kafka cluster created with the server.propertiesin the info box from the Prerequisites section, use the following configuration:The key-value pairs for your connection depend on what kind of Kafka cluster you are connecting to. Most operators in the Kafka Airflow provider mandate that you define the bootstrap.serverskey. You can find a full list of optional connection parameters in the librdkafka documentation.
- 
Click Save. 
- 
Create a second new connection. 
- 
Name your second connection kafka_listenerand select theApache Kafkaconnection type. Provide the same details as you did in Step 2, but set thegroup.idtogroup_2. You must have a second connection with a differentgroup.idbecause the DAGs in this tutorial have two consuming tasks that consume messages from the same Kafka topic. Learn more in Kafka’s Consumer Configs documentation.
- 
Click Save. 
Step 3: Create a DAG with a producer and a consumer task
The Kafka Airflow provider package contains a ProduceToTopicOperator, which you can use to produce messages directly to a Kafka topic, and a ConsumeFromTopicOperator, which you can use to directly consume messages from a topic.
- 
Create a new file in your dagsfolder calledproduce_consume_treats.py.
- 
Copy and paste the following code into the produce_consume_treats.pyfile:This DAG produces messages to a Kafka topic ( KAFKA_TOPIC) and consumes them.- The produce_treatstask retrieves the number of treats (num_treats) to give to your pet from the upstreamget_number_of_treatstask. Then, the task supplies the number of treats to theproducer_functionas a positional argument with theproducer_function_argsparameter. In a similar process, the task also retrieves the name of your pet from the upstreamget_your_pet_nametask and provides it as a kwarg toproducer_function_kwargs.
- Next, the produce_treatstask writes one message for every treat to a Kafka topic. Each message contains the pet name, a randomly picked pet mood after the treat has been given, and whether or not a treat was the last one in a series. TheProduceToTopicOperatoraccomplishes this by using a function passed to itsproducer_functionparameter, which returns a generator containing key-value pairs.
- The consume_treatstask consumes messages from the same Kafka topic and modifies them to print a string to the logs using the callable provided to theapply_functionparameter. This task also retrieves a value from an upstream task and supplies it as a kwarg to theapply_functionwith theapply_function_kwargsparameter.
 
- The 
- 
Navigate to the Airflow UI ( localhost:8080if you are running Airflow locally) and manually run your DAG.
- 
View the produced events in your Kafka cluster. The following example screenshot shows four messages that have been produced to a topic called test_topic_1in Confluent Cloud. 
- 
View the logs of your consume_treatstask, which shows a list of the consumed events. 
Info
If you defined a schema for your Kafka topic, the generator needs to return compatible objects. In this example, the generator produces a JSON value.
Tip
The ConsumeFromTopicOperator can replace classical sinks by containing the logic to write messages to a storage destination in its
apply_function. This gives you the advantage of being able to use Airflow to schedule message consumption from a Kafka topic based on complex logic embedded in your wider data ecosystem. For example, you can write messages to S3 using the S3CreateObjectOperator, which depends on other upstream task having completed successfully, such as the creation of a specific S3 bucket.
Step 4: Create a listener DAG
Airflow can run a function when a specific message appears in your Kafka topic. The AwaitMessageTriggerFunctionSensor is a deferrable operator that listens to your Kafka topic for a message that fulfills specific criteria, which, when met, runs the callable provided to event_triggered_function. The TriggerDagRunOperator can be used within the event_triggered_function to initiate a run of a downstream DAG.
- 
Create a new file in your dagsfolder calledlisten_to_the_stream.py.
- 
Copy and paste the following code into the file: This DAG has one task called listen_for_moodwhich uses the AwaitMessageTriggerFunctionSensor to listen to messages in all topics supplied to itstopicsparameters. For each message that is consumed, the following actions are performed:- The listen_functionsupplied to theapply_functionparameter of the AwaitMessageTriggerFunctionSensor consumes and processes the message. Thelisten_functionis provided as a dot notation string, which is necessary because the Airflow triggerer component needs to access this function.
- If the message consumed causes the listen_functionto return a value, a TriggerEvent fires.
- After a TriggerEvent fires, the AwaitMessageTriggerFunctionSensor executes the function provided to the event_triggered_functionparameter. In this example, theevent_triggered_functionstarts a downstream DAG using the.execute()method of theTriggerDagRunOperator.
- After the event_triggered_functioncompletes, the AwaitMessageTriggerFunctionSensor returns to a deferred state.
 The AwaitMessageTriggerFunctionSensor always runs and listens. If the task fails, like if a malformed message is consumed, the DAG completes as failedand automatically starts its next DAG run because of the@continuousschedule.
- The 
Info
When working locally, you need to restart your Airflow instance to apply changes to the
apply_functionof the AwaitMessageTriggerFunctionSensor because the function is imported into the Triggerer, which does not periodically restart. To restart Airflow, runastro dev restartin your terminal. Changes to theevent_triggered_functionof the AwaitMessageTriggerFunctionSensor do not require a restart of your Airflow instance.On Astro, the Triggerer is restarted automatically when a new image is deployed, but not on dag-only deploys, see Deploy DAGs to Astro.
Step 5: Create a downstream DAG
The event_triggered_function of the AwaitMessageTriggerFunctionSensor operator starts a downstream DAG. This example shows how to implement a dependency based on messages that appear in your Kafka topic.
- 
Create a new file in your dagsfolder calledwalking_my_pet.py.
- 
Copy and paste the following code into the file: This DAG acts as a downstream dependency to the listen_to_the_streamDAG. You can add any tasks to this DAG.
Step 6: Run the DAGs
Now that all three DAGs are ready, run them to see how they work together.
- 
Make sure you unpause all DAGs in the Airflow UI and that your Kafka cluster is running. 
- 
The listen_to_the_streamDAG immediately starts running after it unpauses and thelisten_for_moodtask goes into a Deferred state, which is indicated with a purple square in the Airflow UI. 
- 
Manually run the produce_consume_treatsDAG to give your pet some treats and produce a few messages to the Kafka cluster.
- 
Check the logs of the listen_for_moodtask in thelisten_to_the_streamDAG to see if a message fitting the criteria defined by thelisten_functionhas been detected. You might need to run theproduce_consume_treatsDAG a couple of times for a message to appear.If the TriggerEvent of the listen_for_moodtask fires, thelisten_for_moodtask logs show thewalking_my_petDAG initiating. 
- 
Finally, check the logs of the walking_my_pettask to see how long your pet enjoyed their walk!
Best practices
Apache Kafka is a tool optimized for streaming messages at high frequencies, for example in an IoT application. Airflow is designed to handle orchestration of data pipelines in batches.
Astronomer recommends to combine these two open source tools by handling low-latency processes with Kafka and data orchestration with Airflow.
Common patterns include:
- Configuring a Kafka cluster with a blob storage like S3 as a sink. Batch process data from S3 at regular intervals.
- Using the ProduceToTopicOperator in Airflow to produce messages to a Kafka cluster as one of several producers.
- Consuming data from a Kafka cluster via the ConsumeFromTopicOperator in batches using the apply function to extract and load information to a blob storage or data warehouse.
- Listening for specific messages in a data stream running through a Kafka cluster using the AwaitMessageTriggerFunctionSensor to trigger downstream tasks after the message appears.
Conclusion
Congratulations! You used the Kafka Airflow provider to directly interact with a Kafka topic from within Apache Airflow.