Orchestrate dbt Core projects with Airflow and Cosmos

dbt Core is an open-source library for analytics engineering that helps users build interdependent SQL models for in-warehouse data transformation, using ephemeral compute of data warehouses. Cosmos is an open-source package developed by Astronomer to run dbt models that are part of a dbt Core project within Airflow.

dbt on Airflow with Cosmos and the Astro CLI

The open-source provider package Cosmos allows you to integrate dbt jobs into Airflow by automatically creating Airflow tasks from dbt models. You can turn your dbt Core projects into an Airflow Dag or task group with just a few lines of code.

You can find comprehensive instructions on how to set up Cosmos for different data warehouses, Cosmos configuration options, and how to optimize Cosmos performance in the Orchestrating dbt with Apache Airflow® using Cosmos eBook and a shorter summary of the most important concepts in the Quick Notes: Airflow + dbt with Cosmos.

Why use Airflow with dbt Core?

dbt Core offers the possibility to build modular, reusable SQL components with built-in dependency management and incremental builds.

With Cosmos, you can integrate dbt jobs into your open-source Airflow orchestration environment as standalone Dags or as task groups within Dags.

The benefits of using Airflow with dbt Core include:

  • Use Airflow’s data-aware scheduling and Airflow sensors to run models depending on other events in your data ecosystem.
  • Turn each dbt model into a task, complete with Airflow features like retries and error notifications, as well as full observability into past runs directly in the Airflow UI.
  • Run dbt test on tables created by individual models immediately after a model has completed. Catch issues before moving downstream and integrate additional data quality checks with your preferred tool to run alongside dbt tests.
  • Run dbt projects using Airflow connections instead of dbt profiles. You can store all your connections in one place, directly within Airflow or by using a secrets backend.
  • Leverage native support for installing and running dbt in a virtual environment to avoid dependency conflicts with Airflow.
  • Generate and host dbt docs with Airflow.

With Astro, you get all the above benefits and you can deploy your dbt project to your Astro Deployment independently of your Airflow project using the Astro CLI. For more information, see Deploy dbt projects to Astro.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI.
  • Access to a data warehouse supported by dbt Core. See dbt documentation for all supported warehouses. This tutorial uses a Postgres database.

You do not need to have dbt Core installed locally in order to complete this tutorial.

Step 1: Configure your Astro project

To use dbt Core with Airflow install dbt Core in a virtual environment and Cosmos in a new Astro project.

  1. Create a new Astro project:

    1$ mkdir astro-dbt-core-tutorial && cd astro-dbt-core-tutorial
    2$ astro dev init
  2. Add Cosmos, the Airflow Postgres provider and the dbt Postgres adapter to your Astro project requirements.txt file. If you are using a different data warehouse, replace apache-airflow-providers-postgres and dbt-postgres with the provider package for your data warehouse. You can find information on all provider packages on the Astronomer registry.

    astronomer-cosmos==1
    apache-airflow-providers-postgres==6
    apache-airflow-providers-common-sql==1
    dbt-postgres==1
  3. (Alternative) If you cannot install your dbt adapter in the same environment as Airflow due to package conflicts you can create a dbt executable in a virtual environment. In your Dockerfile add the following lines to the end of the file:

    # replace dbt-postgres with another supported adapter if you're using a different warehouse type
    RUN python -m venv dbt_venv && source dbt_venv/bin/activate && \
    pip install --no-cache-dir dbt-postgres && deactivate

    This code runs a bash command when the Docker image is built that creates a virtual environment called dbt_venv inside of the Astro CLI scheduler container. The dbt-postgres package, which also contains dbt-core, is installed in the virtual environment. If you are using a different data warehouse, replace dbt-postgres with the adapter package for your data warehouse.

There are other options to run Cosmos even if you cannot install your dbt adapter in the requirements.txt file or create a virtual environment in your Docker image. See the Cosmos documentation on execution modes for more information.

Step 2: Prepare your dbt project

To integrate your dbt project with Airflow, you need to add the project folder to your Airflow environment. For this step you can either add your own project or follow the steps below to create a simple project using two models.

  1. Create a folder called dbt in your include folder.

  2. In the dbt folder, create a folder called my_simple_dbt_project.

  3. In the my_simple_dbt_project folder add your dbt_project.yml. This configuration file needs to contain at least the name of the project. This tutorial additionally shows how to inject a variable called my_name from Airflow into your dbt project.

    1version: '0.1'
    2name: 'my_simple_dbt_project'
    3vars:
    4 my_name: "No entry"
  4. Add your dbt models in a subfolder called models in the my_simple_dbt_project folder. You can add as many models as you want to run. This tutorial uses the following two models:

    model1.sql:

    1SELECT '{{ var("my_name") }}' as name

    model2.sql:

    1SELECT * FROM {{ ref('model1') }}

    model1.sql selects the variable my_name. model2.sql depends on model1.sql and selects everything from the upstream model.

You should now have the following structure within your Airflow environment:

.
└── dags
└── include
└── dbt
└── my_simple_dbt_project
├── dbt_project.yml
└── models
├── model1.sql
└── model2.sql

If storing your dbt project alongside your Airflow project is not feasible, there are other ways to use Cosmos, even if the dbt project is hosted in a different location, for example by using a manifest file to parse the project and a containerized execution mode. See the Cosmos documentation for more information.

Step 3: Create an Airflow connection to your data warehouse

Cosmos allows you to apply Airflow connections to your dbt project.

  1. Start Airflow by running astro dev start.

  2. In the Airflow UI, go to Admin -> Connections and click +.

  3. Create a new connection named db_conn. Select the connection type and supplied parameters based on the data warehouse you are using. For a Postgres connection, enter the following information:

    • Connection ID: db_conn.
    • Connection Type: Postgres.
    • Host: Your Postgres host address.
    • Schema: Your Postgres database.
    • Login: Your Postgres login username.
    • Password: Your Postgres password.
    • Port: Your Postgres port.

If a connection type for your database isn’t available, you might need to make it available by adding the relevant provider package to requirements.txt and running astro dev restart.

Step 4: Write your Airflow Dag

The Dag you’ll write uses Cosmos to create tasks from existing dbt models and the SQLExecuteQueryOperator to query a table that was created. You can add more upstream and downstream tasks to embed the dbt project within other actions in your data ecosystem.

  1. In your dags folder, create a file called my_simple_dbt_dag.py.

  2. Copy and paste the following Dag code into the file:

    1"""
    2### Run a dbt Core project as a task group with Cosmos
    3
    4Simple DAG showing how to run a dbt project as a task group, using
    5an Airflow connection and injecting a variable into the dbt project.
    6"""
    7
    8from airflow.sdk import dag, chain
    9from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    10from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
    11
    12# adjust for other database types
    13from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping
    14import os
    15
    16YOUR_NAME = "YOUR_NAME"
    17CONNECTION_ID = "db_conn"
    18DB_NAME = "YOUR_DB_NAME"
    19SCHEMA_NAME = "YOUR_SCHEMA_NAME"
    20MODEL_TO_QUERY = "model2"
    21# The path to the dbt project
    22DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/include/dbt/my_simple_dbt_project"
    23
    24# OPTIONAL: The path where Cosmos will find the dbt executable
    25# in the virtual environment created in the Dockerfile if you cannot
    26# install your dbt adapter in requirements.txt due to package conflicts.
    27# DBT_EXECUTABLE_PATH = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt"
    28
    29profile_config = ProfileConfig(
    30 profile_name="default",
    31 target_name="dev",
    32 profile_mapping=PostgresUserPasswordProfileMapping(
    33 conn_id=CONNECTION_ID,
    34 profile_args={"schema": SCHEMA_NAME},
    35 ),
    36)
    37
    38# OPTIONAL: The path where Cosmos will find the dbt executable
    39# execution_config = ExecutionConfig(
    40# dbt_executable_path=DBT_EXECUTABLE_PATH,
    41# )
    42
    43
    44@dag(
    45 params={"my_name": YOUR_NAME},
    46)
    47def my_simple_dbt_dag():
    48 transform_data = DbtTaskGroup(
    49 group_id="transform_data",
    50 project_config=ProjectConfig(DBT_PROJECT_PATH),
    51 profile_config=profile_config,
    52 # OPTIONAL: your execution config if you are using a virtual environment
    53 # execution_config=execution_config,
    54 operator_args={
    55 "vars": '{"my_name": {{ params.my_name }} }',
    56 },
    57 default_args={"retries": 2},
    58 )
    59
    60 query_table = SQLExecuteQueryOperator(
    61 task_id="query_table",
    62 conn_id=CONNECTION_ID,
    63 sql=f"SELECT * FROM {DB_NAME}.{SCHEMA_NAME}.{MODEL_TO_QUERY}",
    64 )
    65
    66 chain(transform_data, query_table)
    67
    68
    69my_simple_dbt_dag()

    This Dag uses the DbtTaskGroup class from the Cosmos package to create a task group from the models in your dbt project. Dependencies between your dbt models are automatically turned into dependencies between Airflow tasks. Make sure to add your own values for YOUR_NAME, YOUR_DB_NAME, and YOUR_SCHEMA_NAME.

    Using the vars keyword in the dictionary provided to the operator_args parameter, you can inject variables into the dbt project. This DAG injects YOUR_NAME for the my_name variable. If your dbt project contains dbt tests, they will be run directly after a model has completed. Note that it is a best practice to set retries to at least 2 for all tasks that run dbt models.

In some cases, especially in larger dbt projects, you might run into a DagBag import timeout error. This error can be resolved by increasing the value of the Airflow configuration core.dagbag_import_timeout.
  1. Run the Dag manually by clicking the play button and view the Dag in the graph view. Expand the task groups to see all tasks.

    Cosmos Dag graph view

  2. Check the XCom returned by the query_table task to see your name in the model2 table.

The DbtTaskGroup class populates an Airflow task group with Airflow tasks created from dbt models inside of a normal Dag. To directly define a full Dag containing only dbt models use the DbtDag class, as shown in the Cosmos documentation.

Congratulations! You’ve run a Dag using Cosmos to automatically create tasks from dbt models. You can learn more about how to configure Cosmos in the Cosmos documentation.

If you are running large dbt projects and want to increase performance, there are several options available to you. A recent feature is the experimental watcher execution mode that can reduce Dag execution time by up to 80% and reaches speeds on par with running dbt build with the dbt CLI. See the Cosmos documentation for more information.

Alternative ways to run dbt Core with Airflow

While using Cosmos is recommended, there are several other ways to run dbt Core with Airflow.

Using the BashOperator

You can use the BashOperator to execute specific dbt commands. It’s recommended to run dbt-core and the dbt adapter for your database in a virtual environment because there often are dependency conflicts between dbt and other packages.

The Dag below uses the BashOperator to activate the virtual environment and execute dbt_run for a dbt project.

1from airflow.sdk import dag
2from airflow.providers.standard.operators.bash import BashOperator
3
4PATH_TO_DBT_PROJECT = "<path to your dbt project>"
5PATH_TO_DBT_VENV = "<path to your venv activate binary>"
6
7
8@dag
9def simple_dbt_dag():
10 dbt_run = BashOperator(
11 task_id="dbt_run",
12 bash_command="source $PATH_TO_DBT_VENV && dbt run --models .",
13 env={"PATH_TO_DBT_VENV": PATH_TO_DBT_VENV},
14 cwd=PATH_TO_DBT_PROJECT,
15 )
16
17
18simple_dbt_dag()

Using the BashOperator to run dbt run and other dbt commands can be useful during development. However, running dbt at the project level has a couple of issues:

  • There is low observability into what execution state the project is in.
  • Failures are absolute and require all models in a project to be run again, which can be costly.

Using a manifest file

Using a dbt-generated manifest.json file gives you more visibility into the steps dbt is running in each task. This file is generated in the target directory of your dbt project and contains its full representation. For more information on this file, see the dbt documentation.

Cosmos can parse manifest files, see the Cosmos documentation for more information.