Join us for Astro Days: NYC on Sept 27!

Understanding the Airflow Metadata Database

Overview

The metadata database is a core component of Airflow. It stores crucial information like the configuration of your Airflow environment’s roles and permissions, as well as all metadata for past and present DAG and task runs.

A healthy metadata database is critical for your Airflow environment. Losing data stored in the metadata database can both interfere with running DAGs and prevent you from accessing data for past DAG runs. As with any core Airflow component, having a backup and disaster recovery plan in place for the metadata database is essential.

In this guide, we will explain everything you need to know about the Airflow metadata database to ensure a healthy Airflow environment, including:

  • Database specifications.
  • Important content stored in the database.
  • Best practices for using the metadata database.
  • Different methods for accessing data of interest.

Note: We strongly advise against directly modifying the metadata database, as this can cause dependency issues and corrupt your Airflow instance.

Database Specifications

Airflow uses SQLAlchemy and Object Relational Mapping (ORM) in Python to connect with the metadata database from the application layer. Any database supported by SQLAlchemy can theoretically be configured to host Airflow’s metadata. The most common databases used are:

  • Postgres
  • MySQL
  • MSSQL
  • SQLite

While SQLite is the default on Apache Airflow, Postgres is by far the most common choice and is recommended for most use cases by the Airflow community. Astronomer uses Postgres for all of its Airflow environments, including local environments running with the Astro CLI and deployed environments on the cloud.

Note: When configuring a database backend, make sure your version is fully supported by checking the Airflow documentation.

You should also consider the size of your metadata database when setting up your Airflow environment. Production environments typically use a managed database service, which includes features like autoscaling and automatic backups. The size you need will depend heavily on the workloads running in your Airflow instance. For reference, Apache Airflow uses a 2GB SQLite database by default, but this is intended for development purposes only. The Astro CLI starts Airflow environments with a 1GB Postgres database.

Changes to the Airflow metadata database configuration and its schema are very common and happen with almost every minor update. For this reason, prior to Airflow 2.3 you should not downgrade your Airflow instance in place. With Airflow 2.3 the db downgrade command was added, providing an option to downgrade Airflow.

Note: Always backup your database before running any database operations!

Content of the Metadata Database

Note: For many use cases you can access contents from the metadata database via the Airflow UI or the stable REST API. These points of access always beat querying the metadata database directly!

There are several types of metadata stored in the metadata database.

  • User login information and permissions.
  • Information used in DAGs, like variables, connections and XComs.
  • Data about DAG and task runs which are generated by the scheduler.
  • Other minor tables, such as tables which store DAG code in different formats or information about import errors.

Most of this data can be queried using the Airflow REST API.

User Information (Security)

A set of tables store information about Airflow users, including their permissions to various Airflow features. As an admin user, you can access some of the content of these tables in the Airflow UI under the Security tab.

DAG Configurations and Variables (Admin)

DAGs can retrieve and use a variety of information from the metadata database such as:

The information in these tables can be viewed and modified under the Admin tab in the Airflow UI.

DAG and Task Runs (Browse)

The scheduler depends on the Airflow metadata database to keep track of past and current events. The majority of this data can be found under the Browse tab in the Airflow UI.

  • DAG Runs stores information on all past and current DAG runs including whether they were successful, whether they were scheduled or manually triggered, and detailed timing information.
  • Jobs contains data used by the scheduler to store information about past and current jobs of different types (SchedulerJob, TriggererJob, LocalTaskJob).
  • Audit logs shows events of various types that were logged to the metadata database (for example, DAGs being paused or tasks being run).
  • Task Instances contains a record of every task run with a variety of attributes such as the priority weight, duration, or the URL to the task log.
  • Task Reschedule lists tasks that have been rescheduled.
  • Triggers shows all currently running triggers.
  • SLA Misses keeps track of tasks that missed their SLA.

Other Tables

There are additional tables in the metadata database storing data ranging from DAG tags over serialized DAG code, import errors to current states of sensors. Some of the information in these tables will be visible in the Airflow UI in various places:

  • The source code of DAGs can be found by clicking on a DAG name from the main view and then going to the Code view.
  • Import errors appear at the top of the DAGs view in the UI.
  • DAG tags will appear underneath their respective DAG with a cyan background.

Airflow Metadata Database Best Practices

  1. When upgrading or downgrading Airflow, always follow the recommended steps for changing Airflow versions: back up the metadata database, check for deprecated features, pause all DAGs, and make sure no tasks are running.

  2. Use caution when pruning old records from your database with db clean. For example, pruning records could affect future runs for tasks that use the depends_on_past argument. The db clean command allows you to delete records older than --clean-before-timestamp from all metadata database tables or a list of tables specified.

  3. Accessing the metadata database from within a DAG (for example by fetching a variable, pulling from XCom, or using a connection ID) requires compute resources. It is therefore best practice to keep these actions within tasks, which creates a connection to the database only for the run time of the task. If these connections are written as top level code, connections are created every time the scheduler parses the DAG file, which is every 30 seconds by default!

  4. Memory in the Airflow metadata database can be limited depending on your setup, and running low on memory in your metadata database can cause performance issues in Airflow. This is one of the many reasons why we highly advise against moving large amounts of data via XCom, and recommend using a cleanup and archiving mechanism in any production deployments.

  5. Since the metadata database is critical for the scalability and resiliency of your Airflow deployment, it is best practice to use a managed database service for production environments, for example AWS RDS or Google Cloud SQL.

Examples: Programmatically Access the Metadata Database

When possible, the best methods for retrieving data from the metadata database are using the Airflow UI or making a GET request to the stable Airflow REST API. Between the UI and API, much of the metadata database can be viewed without the risk inherent in direct querying. For use cases where neither the Airflow UI nor the REST API can provide sufficient data, we recommended using SQLAlchemy to query the metadata database. This provides an additional layer of abstraction on top of the tables, which makes your code less sensitive to minor schema changes.

In this section of the guide we provide a few examples for how you can retrieve specific information from the metadata database.

Example: Retrieving Number of Successfully Completed Tasks

A common reason users may want to access the metadata database is to get metrics like the total count of successfully completed tasks.

Using the stable REST API to query the metadata database is the recommended way to programmatically retrieve this information. Make sure you have correctly authorized API use in your Airflow instance and set the ENDPOINT_URL to the correct location (for local development: http://localhost:8080/).

The Python script below uses the requests library to make a GET request to the Airflow API for all successful (state=success) Task Instances of all (shorthand: ~) DAG runs of all (~) DAGs in the Airflow instance. Authentication is provided via a user name and password stored as environment variables. By printing the total_entries property of the API response json one can get a count of all successfully completed tasks.

# import the request library
import requests
import os

# provide the location of your airflow instance
ENDPOINT_URL = "http://localhost:8080/"

# in this example env variables were used to store login information
# you will need to provide your own credentials
user_name = os.environ['USERNAME_AIRFLOW_INSTANCE']
password = os.environ['PASSWORD_AIRFLOW_INSTANCE']

# query the API for successful task instances from all dags and all dag runs (~)
req = requests.get(
  f"{ENDPOINT_URL}/api/v1/dags/~/dagRuns/~/taskInstances?state=success",  
  auth=(user_name, password))

# from the API response print the value for "total entries"
print(req.json()['total_entries'])

It is also possible to navigate to Browse -> Task Instances in the Airflow UI and filter the task instances for all with a state of success. The Record Count will be on the right side of your screen.

Count successful tasks Airflow UI

Example: Pause and Unpause a DAG

Pausing and unpausing DAGs is a common action when running Airflow and while you can achieve this by manually toggling DAGs in the Airflow UI, depending on your use case and the number of DAGs you want to toggle this might be tedious. The Airflow REST API offers a simple way to pause and unpause DAGs by sending a PATCH request.

The Python script below sends a PATCH request to the Airflow API to update the entry for the DAG with a specific ID (here example_dag_basic), which is paused (update_mask=is_paused) with a json that will set the is_paused property to True therefore unpausing the DAG.

# import the request library
import requests
import os

# provide the location of your airflow instance
ENDPOINT_URL = "http://localhost:8080/"

# in this example env variables were used to store login information
# you will need to provide your own credentials
user_name = os.environ['USERNAME_AIRFLOW_INSTANCE']
password = os.environ['PASSWORD_AIRFLOW_INSTANCE']

# data to update, for unpausing, simply set this to False
update= {"is_paused": True}
# specify the dag to pause/unpause
dag_id = example_dag_basic

# query the API to patch all tasks as paused
req = requests.patch(
  f"{ENDPOINT_URL}/api/v1/dags/{dag_id}?update_mask=is_paused", json=update,
  auth=(user_name, password))

# print the API response
print(req.text)

Example: Delete a DAG

Deleting the metadata of a DAG can be accomplished either by clicking the trashcan icon in the Airflow UI or sending a DELETE request via the Airflow REST API. This is not possible while the DAG is still running, and will not delete the Python file in which the DAG is defined, meaning the DAG will appear again in your UI with no history at the next parsing of the /dags folder from the scheduler.

The Python script below sends a DELETE request to a DAG with a specific ID (here: dag_to_delete).

# import the request library
import requests
import os

# provide the location of your airflow instance
ENDPOINT_URL = "http://localhost:8080/"

# in this example env variables were used to store login information
# you will need to provide your own credentials
user_name = os.environ['USERNAME_AIRFLOW_INSTANCE']
password = os.environ['PASSWORD_AIRFLOW_INSTANCE']

# specify which dag to delete
dag_id = 'dag_to_delete'

# send the deletion request
req = requests.delete(
  f"{ENDPOINT_URL}/api/v1/dags/{dag_id}",
  auth=(user_name, password))

# print the API response
print(req.text)

Example: Retrieve all DAG dependencies

Cross-DAG dependencies are a powerful tool to take your data orchestration to the next level. Retrieving data about cross-DAG dependencies from the metadata database can be useful for programmatically implementing custom solutions, for example to make sure downstream DAGs are paused if an upstream DAG is paused. These dependencies can be visualized in the Airflow UI under Browse -> DAG Dependencies, but they are not accessible through the Airflow REST API.

To programmatically access this information, you can use SQLAlchemy with Airflow models to access data from the metadata database. Note that if you are running Airflow in a Dockerized setting, you have to run the script below from within your scheduler container.

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from airflow.models.serialized_dag import SerializedDagModel
import os

# retrieving your SQL Alchemy connection
# if you are using Astro CLI this env variable will be set up automatically
sql_alchemy_conn = os.environ['AIRFLOW__CORE__SQL_ALCHEMY_CONN']

conn_url = f'{sql_alchemy_conn}/postgres'

engine = create_engine(conn_url)

with Session(engine) as session:
    result = session.query(SerializedDagModel).first()
    print(result.get_dag_dependencies())

Example: Retrieving Alembic Version

In very rare cases, you might need a value from the metadata database which is not accessible through any of the methods we’ve discussed. In this case, you can query the metadata database directly. Before you do so, remember that you can corrupt your Airflow instance by directly manipulating the metadata database, especially if the schema changes between upgrades.

The query below retrieves the current alembic version ID, which is not accessible through any of the recommended ways of interacting with the metadata database. Database administrators might need the version ID for complex data migration operations.

from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import os

# retrieving your SQL Alchemy connection
# if you are using Astro CLI this env variable will be set up automatically
sql_alchemy_conn = os.environ['AIRFLOW__CORE__SQL_ALCHEMY_CONN']

conn_url = f'{sql_alchemy_conn}/postgres'

engine = create_engine(conn_url)

# this is a direct query to the metadata database: use at your own risk!
stmt = """SELECT version_num
        FROM alembic_version;"""

with Session(engine) as session:
    result = session.execute(stmt)
    print(result.all()[0][0])
Modern,Cloud-Native Data Orchestration powered by Apache Airflow

Start building your next-generation data platform with Astro.

Get Started