Executing Azure Data Explorer Queries with Airflow
IntegrationsAzureDAGs
Note: All code in this guide can be found in this Github repo.
Azure Data Explorer (ADX) is a managed data analytics service used for performing real-time analysis of large volumes of streaming data. It's particularly useful for IoT applications, big data logging platforms, and SaaS applications.
Using Airflow's built-in Azure Data Explorer Hook and Operator, you can easily integrate ADX queries into your DAGs. In this guide, we'll describe how to make your ADX cluster to work with Airflow and walk through an example DAG that runs a query against a database in that cluster.
If you don't already have an ADX cluster running and want to follow along with this example, you can find instructions for creating a cluster and loading it with some demo data in Microsoft's documentation.
In order for Airflow to talk to your Azure Data Explorer database, you need to configure service principle authentication. To do this, you create and register an Azure AD service principle, then give that principle permission to access your Azure Data Explorer database. For detailed instructions on how to do this, refer to the Azure documentation.
Once you have your Azure Data Explorer cluster running and service principle authentication configured, you can get started querying a database with Airflow.
Note: In Airflow 2.0, provider packages are separate from the core of Airflow. If you are running 2.0 with Astronomer, the
apache-airflow-providers-microsoft-azure
package is already included in our Airflow Certified Image; if you are not using Astronomer you may need to install this package separately to use the hooks, operators, and connections described here.
The first step is to set up an Airflow connection to your Azure Data Explorer cluster. If you do this using the Airflow UI, it should look something like this. First, set up an Airflow connection to your Azure Data Explorer cluster. If you do this using the Airflow UI, it should look something like this:
The required pieces for the connection are:
For more information on setting up this connection, including available authentication methods, see the ADX hook documentation.
Next, we define our DAG:
from airflow import DAG
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
from datetime import datetime, timedelta
adx_query = '''StormEvents
| sort by StartTime desc
| take 10'''
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
with DAG('azure_data_explorer',
start_date=datetime(2020, 12, 1),
max_active_runs=1,
schedule_interval='@daily',
default_args=default_args,
catchup=False
) as dag:
opr_adx_query = AzureDataExplorerQueryOperator(
task_id='adx_query',
query=adx_query,
database='storm_demo',
azure_data_explorer_conn_id='adx'
)
We define the query we want to run, adx_query
, and pass that into the AzureDataExplorerQueryOperator along with the name of the database and the connection ID. When we run this DAG, the results of the query will automatically be pushed to XCom. When we go to XComs in the Airflow UI, we can see the data is there:
From here we can build out our DAG with any additional dependent or independent tasks as needed.
Do Airflow the easy way.