Get started with Apache Airflow, Part 2: Providers, connections, and variables

Use this tutorial after completing Part 1: Write your first DAG to learn about how to connect Airflow to external systems.

After you complete this tutorial, you’ll be able to:

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To complete this tutorial, you’ll need to know:

Prerequisites

  • The Astro CLI version 1.34.0 or later.
  • The completed project from Part 1: Write your first DAG. To jump directly into this tutorial, create a new Astro project and copy the code at the end of Step 6 into your project as a new DAG.

Step 1: Create your DAG

In this second part of Astronomer’s introduction to Airflow, you’ll add a third DAG to your Astro project. The new DAG interacts with the Open Notify API to print the location of the International Space Station (ISS) to your task logs.

  1. Create a new Python file in the dags directory of your Astro project called find_the_iss.py.
  2. Copy and paste the code below into find_the_iss.py.
Click to view the full DAG code
1"""
2## Find the International Space Station
3
4This DAG waits for a specific commit message to appear in a GitHub repository,
5and then will pull the current location of the International Space Station from an API
6and print it to the logs.
7
8This DAG needs a GitHub connection with the name `my_github_conn` and
9an HTTP connection with the name `open_notify_api_conn`
10and the host `https://api.open-notify.org/` to work.
11"""
12
13from airflow.decorators import dag, task
14from airflow.models.baseoperator import chain
15from airflow.providers.http.operators.http import HttpOperator
16from airflow.providers.github.sensors.github import GithubSensor
17from airflow.exceptions import AirflowException
18from airflow.models import Variable
19from pendulum import datetime
20from typing import Any
21import logging
22
23task_logger = logging.getLogger("airflow.task")
24
25YOUR_GITHUB_REPO_NAME = Variable.get(
26 "my_github_repo", "apache/airflow"
27) # This is the variable you created in the Airflow UI
28YOUR_COMMIT_MESSAGE = "Where is the ISS right now?" # Replace with your commit message
29
30
31def commit_message_checker(repo: Any, trigger_message: str) -> bool | None:
32 """Check the last 10 commits to a repository for a specific message.
33 Args:
34 repo (Any): The GitHub repository object.
35 trigger_message (str): The commit message to look for.
36 """
37
38 task_logger.info(
39 f"Checking for commit message: {trigger_message} in 10 latest commits to the repository {repo}."
40 )
41
42 result = None
43 try:
44 if repo is not None and trigger_message is not None:
45 commits = repo.get_commits().get_page(0)[:10]
46 for commit in commits:
47 if trigger_message in commit.commit.message:
48 result = True
49 break
50 else:
51 result = False
52
53 except Exception as e:
54 raise AirflowException(f"GitHub operator error: {e}")
55 return result
56
57
58@dag(
59 start_date=datetime(2024, 1, 1),
60 schedule="@daily",
61 catchup=False,
62 doc_md=__doc__,
63 default_args={"owner": "airflow", "retries": 3},
64 tags=["Connections"],
65)
66def find_the_iss():
67
68 github_sensor = GithubSensor(
69 task_id="github_sensor",
70 github_conn_id="my_github_conn",
71 method_name="get_repo",
72 method_params={"full_name_or_id": YOUR_GITHUB_REPO_NAME},
73 result_processor=lambda repo: commit_message_checker(repo, YOUR_COMMIT_MESSAGE),
74 timeout=60 * 60,
75 poke_interval=5,
76 )
77
78 get_iss_coordinates = HttpOperator(
79 task_id="get_iss_coordinates",
80 http_conn_id="open_notify_api_conn",
81 endpoint="/iss-now.json",
82 method="GET",
83 log_response=True,
84 )
85
86 @task
87 def log_iss_location(location: str) -> dict:
88 """
89 This task prints the current location of the International Space Station to the logs.
90 Args:
91 location (str): The JSON response from the API call to the Open Notify API.
92 Returns:
93 dict: The JSON response from the API call to the Reverse Geocode API.
94 """
95 import requests
96 import json
97
98 location_dict = json.loads(location)
99
100 lat = location_dict["iss_position"]["latitude"]
101 lon = location_dict["iss_position"]["longitude"]
102
103 r = requests.get(
104 f"https://api.bigdatacloud.net/data/reverse-geocode-client?latitude={lat}&longitude={lon}"
105 ).json()
106
107 country = r["countryName"]
108 city = r["locality"]
109
110 task_logger.info(
111 f"The International Space Station is currently over {city} in {country}."
112 )
113
114 return r
115
116 log_iss_location_obj = log_iss_location(get_iss_coordinates.output)
117
118 chain(github_sensor, get_iss_coordinates, log_iss_location_obj)
119
120
121find_the_iss()

Step 2: Add a provider package

  1. If your Airflow project is not running locally yet, run astro dev start in the your Astro project directory to start your Airflow environment.

  2. Open the Airflow UI to confirm that your DAG was pushed to your environment. On the Dags page, you should see a “DAG Import Error” like the one shown here:

    Screenshot of the Airflow UI Showing an Import Error saying: ModuleNotFoundError: No module named 'airflow.providers.http'

    This error is due to a missing provider package. Provider packages are Python packages maintained separately from core Airflow that contain hooks and operators for interacting with external services. You can browse all available providers in the Astronomer Registry.

    Your DAG uses operators from the HTTP provider, which is missing from your Airflow environment. Let’s fix that!

  3. Open the HTTP provider page in the Astronomer Registry.

  4. Copy the provider name and version by clicking Use Provider in the top right corner.

    Screenshot of the Astronomer Registry showing the HTTP provider page with the Use Provider button highlighted.

  5. Paste the provider name and version into the requirements.txt file of your Astro project. Make sure to only add apache-airflow-providers-http=<version> without pip install.

  6. Restart your Airflow environment by running astro dev restart. Unlike DAG code changes, package dependency changes require a complete restart of Airflow.

Step 3: Add an Airflow variable

After restarting your Airflow instance, you should not see the DAG import error from Step 2. Next, you need to add an Airflow variable to be used in the HTTPOperator.

Airflow variables are key value pairs that can be accessed from any DAG in your Airflow environment. Currently, the variable my_endpoint is used in the DAG code with a default of NOT SET, you’ll need to create the variable and give it a value in the Airflow UI.

  1. Go to Admin > Variables to open the list of Airflow variables. Since no Airflow variables have been defined yet, it is empty.

    Screenshot of the Airflow UI with the Admin tab menu expanded to show the Variables option.

  2. Click on the + Add Variable button in the top right corner to open the form for adding a new variable. Set the Key for the variable as my_endpoint and set the Val to /iss-now.json. This is the endpoint of the Open Notify API that returns the current location of the ISS. The variable is used in the get_iss_coordinates task to specify the endpoint to query.

  3. Click Save.

Step 4: Create an HTTP connection

An Airflow connection is a set of configurations for connecting with an external tool in the data ecosystem. If you use a hook or operator that connects to an external system, it likely needs a connection.

  1. Click on Admin > Connections to open the list of Airflow connections. Since no Airflow connections have been defined yet, it is empty.
  2. Click + Add Connection to create a new connection.
  3. Name the connection open_notify_api_conn and select a Connection Type of HTTP.
  4. Enter the host URL for the API you want to query in the Host field. For this tutorial we use the Open Notify API, which has an endpoint returning the current location of the ISS. The host for this API is http://api.open-notify.org.
  5. Click Save.

Step 5: Review the DAG code

Now that your Airflow environment is configured correctly, look at the DAG code you copied from the repository to see how your new variable and connections are used at the code level.

At the top of the file, the DAG is described in a docstring. It’s highly recommended to always document your DAGs and include any additional connections or variables that are required for the DAG to work.

1"""
2## Find the International Space Station
3
4This DAG pulls the current location of the International Space Station from an API
5and prints it to the logs.
6
7This DAG needs a HTTP connection with the name `open_notify_api_conn`
8and the host `https://api.open-notify.org/` to work.
9"""

After the docstring, all necessary packages are imported. Notice how both the HttpOperator as well as the GithubSensor are part of provider packages.

1from airflow.sdk import chain, dag
2from airflow.decorators import task
3from airflow.providers.http.operators.http import HttpOperator
4from airflow.models import Variable
5from pendulum import datetime
6import logging

Next, the Airflow task logger is instantiated and two top-level variables are defined. The variable MY_ENDPOINT is set to the value of the Airflow variable my_endpoint you defined in Step 3.

1task_logger = logging.getLogger("airflow.task")
2
3MY_ENDPOINT = Variable.get(
4 "my_endpoint", "NOT SET"
5) # This is the variable you created in the Airflow UI!

The DAG itself is defined using the @dag decorator with the following parameters:

  • dag_id is not set explicitly, so it defaults to the name of the Python function, find_the_iss.
  • start_date is set to January 1st, 2024, which means the DAG starts to be scheduled after this date.
  • schedule is set to @daily, which means the DAG runs every day at 0:00 UTC. You can use any CRON string or shorthand for time-based schedules.
  • doc_md is set to the docstring of the DAG file to create DAG Docs you can view in the Airflow UI.
  • default_args is set to a dictionary with the key owner set to airflow and the key retries set to 3. The latter setting gives each task in this DAG 3 retries before failing, which is a common best practice to protect against transient failures.
  • tags adds the Connections tag to the DAG in the Airflow UI.
1@dag(
2 start_date=datetime(2025, 3, 1),
3 schedule="@daily",
4 doc_md=__doc__,
5 default_args={"owner": "airflow", "retries": 3},
6 tags=["Connections"],
7)
8def find_the_iss():

This DAG has two tasks:

  • The first task uses the HttpOperator to send a GET request to the /iss-now.json endpoint of the Open Notify API to retrieve the current location of the ISS. The response is logged to the Airflow task logs and pushed to the XCom table in the Airflow metadata database to be retrieved by downstream tasks.

    1get_iss_coordinates = HttpOperator(
    2 task_id="get_iss_coordinates",
    3 http_conn_id="open_notify_api_conn",
    4 endpoint=MY_ENDPOINT,
    5 method="GET",
    6 log_response=True,
    7)
  • The second task uses the TaskFlow API’s @task decorator to run a Python function that processes the coordinates returned by the get_iss_coordinates task and prints the city and country of the ISS’s location to the task logs. The coordinates are passed to the function as an argument using get_iss_coordinates.output, which accesses the data returned by the get_iss_coordinates task from XComs.

    These two tasks are an example of how you can use a traditional operator (HttpOperator) and a TaskFlow API task to perform similar operations, in this case querying an API. The best way to write tasks depends on your use case and often comes down to personal preference.

    1@task
    2def log_iss_location(location: str) -> dict:
    3 """
    4 This task prints the current location of the International Space Station to the logs.
    5 Args:
    6 location (str): The JSON response from the API call to the Open Notify API.
    7 Returns:
    8 dict: The JSON response from the API call to the Reverse Geocode API.
    9 """
    10 import requests
    11 import json
    12
    13 location_dict = json.loads(location)
    14
    15 lat = location_dict["iss_position"]["latitude"]
    16 lon = location_dict["iss_position"]["longitude"]
    17
    18 r = requests.get(
    19 f"https://api.bigdatacloud.net/data/reverse-geocode-client?latitude={lat}&longitude={lon}"
    20 ).json()
    21
    22 country = r["countryName"]
    23 city = r["locality"]
    24
    25 task_logger.info(
    26 f"The International Space Station is currently over {city} in {country}."
    27 )
    28
    29 return r
    30
    31# calling the @task decorated task with the output of the get_iss_coordinates task
    32log_iss_location_obj = log_iss_location(get_iss_coordinates.output)

Lastly, the dependency between the two tasks is set so that the log_iss_location task only runs after the get_iss_coordinates task is successful. This is done using the chain method. You can learn more about setting dependencies between tasks in the Manage task and task group dependencies in Airflow guide.

The last line of the DAG file calls the find_the_iss function to create the DAG.

1 chain(get_iss_coordinates, log_iss_location_obj)
2
3find_the_iss()

Step 6: Test your DAG

  1. Go to the DAGs view and unpause the find_the_iss DAG by clicking on the toggle to the left of the DAG name. The last scheduled DAG run automatically starts.

    DAG running

  2. Check the logs of the log_iss_location task to learn where the ISS is right now!

[2025-03-30, 17:52:19] INFO - The International Space Station is currently over Ta’if in Saudi Arabia.: source="airflow.task"

See also