Note: All code in this guide can be found in this Github repo.

Overview

Azure Data Explorer (ADX) is a managed data analytics service used for performing real-time analysis of large volumes of streaming data. It's particularly useful for IoT applications, big data logging platforms, and SaaS applications.

Using Airflow's built-in Azure Data Explorer Hook and Operator, you can easily integrate ADX queries into your DAGs. In this guide, we'll describe how to make your ADX cluster to work with Airflow and walk through an example DAG that runs a query against a database in that cluster.

If you don't already have an ADX cluster running and want to follow along with this example, you can find instructions for creating a cluster and loading it with some demo data in Microsoft's documentation.

Configuring ADX to Work with Airflow

In order for Airflow to talk to your Azure Data Explorer database, you need to configure service principle authentication. To do this, you create and register an Azure AD service principle, then give that principle permission to access your Azure Data Explorer database. For detailed instructions on how to do this, refer to the Azure documentation.

Running an ADX Query with Airflow

Once you have your Azure Data Explorer cluster running and service principle authentication configured, you can get started querying a database with Airflow.

Note: In Airflow 2.0, provider packages are separate from the core of Airflow. If you are running 2.0 with Astronomer, the apache-airflow-providers-microsoft-azure package is already included in our Airflow Certified Image; if you are not using Astronomer you may need to install this package separately to use the hooks, operators, and connections described here.

The first step is to set up an Airflow connection to your Azure Data Explorer cluster. If you do this using the Airflow UI, it should look something like this. First, set up an Airflow connection to your Azure Data Explorer cluster. If you do this using the Airflow UI, it should look something like this:

ADX Connection

The required pieces for the connection are:

  • Host: your cluster URL
  • Login: your client ID
  • Password: your client secret
  • Extra: should include at least "tenant" with your tenant ID, and "auth_method" with your chosen authentication method. Based on the auth method, you may also need to specify "certificate" and/or "thumbprint" parameters.

For more information on setting up this connection, including available authentication methods, see the ADX hook documentation.

Next, we define our DAG:

from airflow import DAG
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
from datetime import datetime, timedelta

adx_query = '''StormEvents
| sort by StartTime desc
| take 10'''

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

with DAG('azure_data_explorer',
         start_date=datetime(2020, 12, 1),
         max_active_runs=1,
         schedule_interval='@daily',
         default_args=default_args,
         catchup=False
         ) as dag:

    opr_adx_query = AzureDataExplorerQueryOperator(
        task_id='adx_query',
        query=adx_query,
        database='storm_demo',
        azure_data_explorer_conn_id='adx'
    )

We define the query we want to run, adx_query, and pass that into the AzureDataExplorerQueryOperator along with the name of the database and the connection ID. When we run this DAG, the results of the query will automatically be pushed to XCom. When we go to XComs in the Airflow UI, we can see the data is there:

ADX Xcom Results

From here we can build out our DAG with any additional dependent or independent tasks as needed.

Never miss an update from us.

Do Airflow the easy way.

Run production-grade Airflow out-of-the-box with Astronomer.