Apache Airflow® Quickstart

It’s easy to get your pipelines up and running with Apache Airflow®.

This quickstart offers three learning paths. Choose between these popular use cases:

  • Learning Airflow: an introduction to Airflow’s lean and dynamic pipelines-as-Python-code
  • ETL: an introduction to modern, enhanced ETL development with Airflow
  • Generative AI: an introduction to generative AI model development with Airflow

Launch your journey with Airflow by signing up for a trial at astronomer.io! You’ll be able to deploy your projects to Astro at the end of this tutorial.

Other ways to learn

For more help getting started, also check out our step-by-step Get Started with Airflow tutorial.

Time to complete

This quickstart takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this quickstart, you should have an understanding of:

Prerequisites

  • The Astro CLI version 1.25.0 or higher.
  • An integrated development environment (IDE) for Python development, such as VS Code, Sublime Text, or PyCharm.
  • (Optional) A local installation of Python 3 to improve your Python developer experience.

Step 1: Clone the Astronomer Quickstart repository

Learning Airflow
  1. Create a new directory for your project and open it:

    1mkdir airflow-quickstart-learning && cd airflow-quickstart-learning
  2. Clone the repository and open it:

    1git clone -b learning-airflow --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/learning-airflow

    Your directory should have the following structure:

    .
    ├── Dockerfile
    ├── README.md
    ├── dags
    │ └── example_astronauts.py
    ├── include
    ├── packages.txt
    ├── requirements.txt
    ├── solutions
    │ └── example_astronauts_solution.py
    └── tests
    └── dags
    └── test_dag_integrity.py
Etl
  1. Create a new directory for your project and open it:

    1mkdir airflow-quickstart-etl && cd airflow-quickstart-etl
  2. Clone the repository and open it:

    1git clone -b etl --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/etl

    Your directory should have the following structure:

    .
    ├── Dockerfile
    ├── README.md
    ├── dags
    │ └── example_etl_galaxies.py
    ├── include
    │ ├── astronomy.db
    │ ├── custom_functions
    │ │ └── galaxy_functions.py
    │ └── data
    │ └── galaxy_names.txt
    ├── packages.txt
    ├── requirements.txt
    ├── solutions
    │ └── example_etl_galaxies_solution.py
    └── tests
    └── dags
    └── test_dag_example.py
Generative Ai
  1. Create a new directory for your project and open it:

    1mkdir airflow-quickstart-genai && cd airflow-quickstart-genai
  2. Clone the repository and open it:

    1git clone -b generative-ai --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/generative-ai

    Your directory should have the following structure:

    .
    ├── Dockerfile
    ├── README.md
    ├── airflow_settings.yaml
    ├── dags
    │ └── example_vector_embeddings.py
    ├── include
    │ ├── custom_functions
    │ │ └── embedding_func.py
    │ └── data
    │ └── galaxy_names.txt
    ├── packages.txt
    ├── requirements.txt
    ├── solutions
    │ └── example_vector_embeddings_solution.py
    └── tests
    └── dags
    └── test_dag_integrity.py

Step 2: Start up Airflow and explore the UI

Learning Airflow
  1. Start the project using the Astro CLI:

    1astro dev start

    The CLI will let you know when all Airflow services are up and running.

  2. In your browser, navigate to localhost:8080 and sign in to the Airflow UI using username admin and password admin.

  3. Unpause the example_astronauts DAG.

  4. Explore the DAGs view (the landing page) and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI’s features, see An introduction to the Airflow UI.

    For example, the DAGs view will look like this screenshot:

    Airflow UI DAGs view

    As you start to trigger DAG runs, the graph view will look like this screenshot:

    Example Astronauts DAG graph view

    The Gantt chart will look like this screenshot:

    Example Astronauts DAG Gantt chart view

Etl
  1. Start the project using the Astro CLI:

    1astro dev start

    The CLI will let you know when all Airflow services are up and running.

  2. In your browser, navigate to localhost:8080 and sign in to the Airflow UI using username admin and password admin.

  3. Unpause the example_astronauts DAG.

  4. Explore the landing page and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI’s features, see An introduction to the Airflow UI.

    For example, the DAGs view will look like this screenshot:

    Airfllow UI DAGs view

    As you start to trigger DAG runs, the graph view will look like this screenshot:

    Example ETL Galaxies DAG graph view

    The Gantt chart will look like this screenshot:

    Example ETL Galaxies DAG Gantt chart

Generative Ai
  1. Start the project using the Astro CLI:

    1astro dev start

    The CLI will let you know when all Airflow services are up and running.

  2. In your browser, navigate to localhost:8080 and sign in to the Airflow UI using username admin and password admin.

  3. Unpause the example_astronauts DAG.

  4. Explore the DAGs view (landing page) and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI’s features, see An introduction to the Airflow UI.

    For example, the DAGs view will look like this screenshot:

    Airfllow UI DAGs view

    As you start to trigger DAG runs, the graph view will look like this screenshot:

    Example Vector Embeddings DAG graph view

    The Gantt chart will look like this screenshot:

    Example Vector Embeddings DAG Gantt chart

Step 3: Explore the project

Learning Airflow

This Astro project introduces you to the basics of orchestrating pipelines with Airflow. You’ll see how easy it is to:

  • get data from data sources.
  • generate tasks automatically and in parallel.
  • trigger downstream workflows automatically.

You’ll build a lean, dynamic pipeline serving a common use case: extracting data from an API and loading it into a database!

This project uses DuckDB, an in-memory database. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!

For production applications, use a persistent database instead (consider DuckDB’s hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).

Pipeline structure

An Airflow instance can have any number of DAGs (directed acyclic graphs), your data pipelines in Airflow. This project has two:

example_astronauts

This DAG queries the list of astronauts currently in space from the Open Notify API, prints assorted data about the astronauts, and loads data into an in-memory database.

Tasks in the DAG are Python functions decorated using Airflow’s TaskFlow API, which makes it easy to turn arbitrary Python code into Airflow tasks, automatically infer dependencies, and pass data between tasks.

  • get_astronaut_names and get_astronaut_numbers make a JSON array and an integer available, respectively, to downstream tasks in the DAG.

  • print_astronaut_craft and print_astronauts make use of this data in different ways. The third task uses dynamic task mapping to create a parallel task for each Astronaut in the list retrieved from the API. Airflow lets you do this with just two lines of code:

    1print_astronaut_craft.partial(greeting="Hello! :)").expand(
    2 person_in_space=get_astronaut_names()
    3),

    The key feature is the expand() function, which makes the DAG automatically adjust the number of tasks each time it runs.

  • create_astronauts_table in duckdb and load_astronauts_in_duckdb create a DuckDB database table for some of the data and load the data, respectively.

example_extract_astronauts

This DAG queries the database you created for astronaut data in example_astronauts and prints out some of this data. Changing a single line of code in this DAG can make it run automatically when the other DAG completes a run.

Etl

Building Extract, Transform, and Load (ETL) workloads is a common pattern in Apache Airflow. This project shows an example pattern for defining an ETL workload using DuckDB as the data warehouse of choice.

As you try out this project, you’ll see how easy Airflow makes it to:

  • write responsive pipelines that change based on user inputs.
  • perform database operations using SQL.
  • access and extract data from local files.
  • execute transformations with Pandas.

You’ll write a lean ETL pipeline in easy-to-read Python code!

This project uses DuckDB, an in-memory database. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!

For production applications, use a persistent database instead (consider DuckDB’s hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).

Pipeline structure

An Airflow project can have any number of DAGs (directed acyclic graphs), the main building blocks of Airflow pipelines. This project has one:

example_etl_galaxies

This DAG contains five @task-decorated Python functions:

  • create_galaxy_table_in_duckdb uses a hook to create a database connection and a SQL query to create a database table.

  • extract_galaxy_data returns a dataframe created using a modularized function imported from the project’s include directory.

  • transform_galaxy_data gets a user-specified value from the DAG context and uses it to execute a simple data transformation on the dataframe, returning another dataframe.

  • load_galaxy_data uses a database hook to load the dataframe into the database. You can load the dataframe directly in the context of a SQL query. No conversion of the dataframe is required.

  • print_loaded_galaxies executes a SELECT query on the database and prints the data to the logs.

Generative Ai

Apache Airflow is one of the most common orchestration engines for AI/Machine Learning jobs, especially for retrieval-augmented generation (RAG). This project shows a simple example of building vector embeddings for text and then performing a semantic search on the embeddings.

The DAG (directed acyclic graph) in the project demonstrates how to leverage Airflow’s automation and orchestration capabilities to:

  • orchestrate a generative AI pipeline.
  • compute vector embeddings of words using Python’s SentenceTransformers library.
  • compare the embeddings of a word of interest to a list of words to find the semantically closest match.

You’ll write a user-customizable generative-AI pipeline in easy-to-read Python code!

This project uses DuckDB, an in-memory database, for running dbt transformations. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!

For production applications, use a persistent database instead (consider DuckDB’s hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).

Pipeline structure

An Airflow project can have any number of DAGs (directed acyclic graphs), the main building blocks of Airflow pipelines. This project has one:

example_vector_embeddings

This DAG contains six tasks:

  • get_words gets a list of words from the context to embed.

  • create_embeddings creates embeddings for the list of words.

  • create_vector_table creates a table in the DuckDB database and an HNSW index on the embedding vector.

  • insert_words_into_db inserts the words and embeddings into the table.

  • embed_word embeds a single word and returns the embeddings.

  • find_closest_word_match finds the closest match to a word of interest.

Step 4: Get your hands dirty!

Learning Airflow

With Airflow, it’s easy to create cross-workflow dependencies. In this step, you’ll learn how to:

  • use Airflow Datasets to create a dependency between DAGs so when one workflow ends another begins. To do this, you’ll modify the example_extract_astronauts DAG to use a Dataset to trigger a DAG run when the example_astronauts DAG updates the table that both DAGs query.

Schedule the example_extract_astronauts DAG on an Airflow Dataset

With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron. Downstream DAGs can be scheduled based on combinations of Dataset updates coming from tasks in the same Airflow instance or calls to the Airflow API.

  1. Define the get_astronaut_names task as a producer of a Dataset. To do this, pass a Dataset object, encapsulated in a list, to the task’s outlets parameter:

    1@task(
    2 outlets=[Dataset("current_astronauts")]
    3)
    4def get_astronaut_names(**context) -> list[dict]:

    For more information about Airflow Datasets, see: Datasets and data-aware scheduling in Airflow.

  2. Schedule a downstream DAG run using an Airflow Dataset:

    Now that you have defined the get_astronauts task in the example_astronauts DAG as a Dataset producer, you can use that Dataset to schedule downstream DAG runs.

    Datasets function like an API to communicate when data at a specific location in your ecosystem is ready for use, reducing the code required to create cross-DAG dependencies. For example, with an import and a single line of code, you can schedule a DAG to run when another DAG in the same Airflow environment has updated a Dataset.

    To schedule the example_extract_astronauts DAG to run when example_astronauts updates the current_astronauts Dataset, add an import statement to make the Airflow Dataset package available:

    1from airflow import Dataset
  3. Then, set the DAG’s schedule using the current_astronauts Dataset:

    1schedule=[Dataset("current_astronauts")],
  4. Rerun the example_astronauts DAG in the UI and check the status of the tasks in the individual DAG view. Watch as the example_extract_astronauts DAG gets triggered automatically when example_astronauts finishes running.

    If all goes well, the graph view of the Dataset-triggered DAG run will look like this screenshot:

    Dataset-triggered run graph view

    For more information about Airflow Datasets, see: Datasets and data-aware scheduling in Airflow.

Etl

With Airflow, you can connect to many external systems and create dynamic and responsive workflows. In this step, you’ll learn how to create a connection to an external system.

Create a connection to an external system

Creating connections to interoperate with the many systems Airflow supports is easy.

In the steps that follow, you’ll create a connection in the Airflow UI and use it in a new DAG. You can use your own preferred external system or use Postgres for a local option.

  1. A Postgres database has already been added to the project for you. Confirm that port 5432 is available, or, if not, modify the external port in the project’s docker-compose.override.yml:

    version: "3.1"
    services:
    example-db:
    image: postgres:14.9
    ports:
    - "5435:5432"
    environment:
    - POSTGRES_USER=example
    - POSTGRES_PASSWORD=example
    - POSTGRES_DB=example
  2. Create a connection in the UI.

    Airflow supports a number of ways to create connections, but most users choose the UI.

    To create a conneciton in the UI, go to Admin > Connections.

    Connections seen from the DAG view

    Your connection should look like this screenshot, with the password being example:

    Postgres connection in UI

  3. Add a DAG to the project.

    Create a new Python file in the dags directory and paste this DAG code there:

    1from airflow.decorators import (
    2 dag,
    3 task,
    4) # This DAG uses the TaskFlow API. See: https://www.astronomer.io/docs/learn/airflow-decorators
    5from pendulum import datetime, duration
    6from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    7from include.custom_functions.load_functions import get_sql_query
    8from airflow.models.dataset import Dataset
    9
    10_DUCKDB_INSTANCE_NAME = os.getenv("DUCKDB_INSTANCE_NAME", "include/astronomy.db")
    11_TABLE_NAME = os.getenv("_TABLE_NAME", "galaxy_data")
    12_DUCKDB_TABLE_NAME = os.getenv("DUCKDB_TABLE_NAME", "galaxy_data")
    13_DUCKDB_TABLE_URI = f"duckdb://{_DUCKDB_INSTANCE_NAME}/{_DUCKDB_TABLE_NAME}"
    14
    15@dag(
    16 start_date=datetime(2024, 7, 1), # Date after which the DAG can be scheduled
    17 schedule=[Dataset(_DUCKDB_TABLE_URI)], # See: https://www.astronomer.io/docs/learn/scheduling-in-airflow for options
    18 catchup=False, # See: https://www.astronomer.io/docs/learn/rerunning-dags#catchup
    19 max_consecutive_failed_dag_runs=5, # auto-pauses the DAG after 5 consecutive failed runs, experimental
    20 max_active_runs=1, # Allow only one concurrent run of this DAG, prevents parallel DuckDB calls
    21 doc_md=__doc__, # Add DAG Docs in the UI, see https://www.astronomer.io/docs/learn/custom-airflow-ui-docs-tutorial
    22 default_args={
    23 "owner": "Astro", # owner of this DAG in the Airflow UI
    24 "retries": 3, # tasks retry 3 times before they fail
    25 "retry_delay": duration(seconds=30), # tasks wait 30s in between retries
    26 }, # default_args are applied to all tasks in a DAG
    27 tags=["example", "ETL"], # Add tags in the UI
    28 # Warning - in-memory DuckDB is not a persistent database between workers. To move this workflow into production, use a
    29 # cloud-based database and, based on concurrency capabilities, adjust the two parameters below.
    30 concurrency=1, # allow only a single task execution at a time, prevents parallel DuckDB calls
    31 is_paused_upon_creation=False, # start running the DAG as soon as it's created
    32)
    33def example_etl_galaxies_load(): # By default, the dag_id is the name of the decorated function
    34
    35 @task
    36 def extract_galaxy_data_duckdb(
    37 duckdb_instance_name: str = _DUCKDB_INSTANCE_NAME,
    38 table_name: str = _TABLE_NAME,
    39 ):
    40 cursor = duckdb.connect(duckdb_instance_name)
    41 galaxy_data_df = cursor.sql(f"SELECT * FROM {table_name};").df()
    42
    43 return galaxy_data_df
    44
    45 @task
    46 def create_sql_query(df):
    47 sql_str = get_sql_query(df, _TABLE_NAME)
    48 return sql_str
    49
    50 create_galaxy_table_postgres = SQLExecuteQueryOperator(
    51 task_id="create_galaxy_table_postgres",
    52 conn_id="postgres_default",
    53 sql=f"""
    54 DROP TABLE IF EXISTS {_TABLE_NAME};
    55 CREATE TABLE {_TABLE_NAME} (
    56 name VARCHAR PRIMARY KEY,
    57 distance_from_milkyway INT,
    58 distance_from_solarsystem INT,
    59 type_of_galaxy VARCHAR,
    60 characteristics VARCHAR
    61 );
    62 """,
    63 )
    64
    65 create_sql_query_obj = create_sql_query(extract_galaxy_data_duckdb())
    66
    67 load_galaxy_data_postgres = SQLExecuteQueryOperator(
    68 task_id = "load_galaxy_data_postgres",
    69 conn_id = "postgres_default",
    70 sql = create_sql_query_obj,
    71 )
    72
    73 create_galaxy_table_postgres >> load_galaxy_data_postgres
    74
    75example_etl_galaxies_load()

    This DAG extracts data from the project’s DuckDB database, creates a table in the project’s Postgres database, and loads the data into the table. Using an Airflow Dataset trigger, it will run when example_etl_galaxies updates the galaxy_data dataset.

    Connection parameters vary between operators. In the case of SQLExecuteQueryOperator, it is conn_id:

    1load_galaxy_data_postgres = SQLExecuteQueryOperator(
    2 task_id = "load_galaxy_data_postgres",
    3 conn_id = "postgres_default",
    4 sql = create_sql_query_obj,
    5)
  4. Trigger your new DAG!

    Trigger the example_etl_galaxies DAG and not the new one you just added. Your new DAG will run after the load_galaxy_task in example_etl_galaxies runs successfully.

    If all goes well, the graph view will look like this screenshot:

    Postgres DAG graph in UI

For more guidance on getting started with connections, see: Integrations & connections.

Generative Ai

With Airflow, it’s easy to test and compare LMs until you find the right model for your generative AI workflows. In this step, you’ll learn how to:

  • configure a DAG to use different LMs.
  • use the Airflow UI to compare the performance of the models you select.

Experiment with different LMs to compare performance

Sentence Transformers (AKA SBERT) is a popular Python module for accessing, using, and training text and image embedding models. It enables a wide range of AI applications, including semantic search, semantic textual similarity, and paraphrase mining. SBERT provides various pre-trained language models via the Sentence Transformers Hugging Face organization. Additionally, over 6,000 community Sentence Transformers models have been publicly released on the Hugging Face Hub.

Try using a different language model from among those provided by SBERT in this project’s DAG. Then, explore the metadata in the Airflow UI to compare the performance of the models.

  1. Start your experiment by using a different model. Find the _LM variable definition in the get_embeddings_one_word function close to the top of the example_vector_embeddings DAG and replace the model string with distiluse-base-multilingual-cased-v2:

    1_LM = os.getenv("LM", "distiluse-base-multilingual-cased-v2")

    The default is very fast, but this one is slower and lower-performing overall, so the results should be noticeably different. You could also try a model with higher overall performance, such as all-mpnet-base-v2. For a list of possible models to choose from, see SBERT’s Pretrained models list.

  2. Next, find the dimensions of the model in the SBERT docs.

    For example, the distiluse-base-multilingual-cased-v2 model has dimensions of 512:

    SBERT model information from SBERT docs

  3. Use this number to redefine another top-level variable, _LM_DIMENSIONS:

    1_LM_DIMENSIONS = os.getenv("LM_DIMS", "512")

    This value is used in the vector column type definition in the create_vector_table task and the select query in the find_closest_word_match task.

  4. Rerun the DAG. Depending on the models you choose, you might see large differences in the performance of the create_embeddings task.

    For example, using the default all-MiniLM-L6-v2 model should result in runtimes of around 4s:

    Default LM task Gantt chart

    By contrast, using the distiluse-base-multilingual-cased-v2 model might result in runtimes three times as long or longer:

    Less performant LM task Details view

  5. Check the log output from the find_closest_word_match task and look for differences between the search result sets.

    For example, the faster LM all-MiniML-L6-v2 returns sun, planet, light:

    Default LM semantic search results in task logs

    The more performant LM all-mpnet-base-v2 returns sun, rocket, planet:

    Alternative LM semantic search results in task logs

For more information about the SBERT project, library, and models, see the Sentence Transformers Documentation.

Next steps: run Airflow on Astro

The easiest way to run Airflow in production is with Astro. To get started, create an Astro trial. During your trial signup, you will have the option of choosing the same template project you worked with in this quickstart.