Load data to MongoDB with Apache Airflow
MongoDB is an open-source general purpose database built by developers, for developers. MongoDB's popularity is driven by its use of flexible document schemas and horizontal scalability. By leveraging the Mongo provider, you can easily orchestrate many use cases with Airflow such as:
- Machine learning pipelines.
- Automating database administration operations.
- Batch data pipelines.
In this tutorial, you'll learn how to use Airflow to load data from an API into MongoDB.
This tutorial was developed in partnership with MongoDB. For more details on this integration, including additional instruction on using MongoDB Charts, check out MongoDB's post Using MongoDB with Apache Airflow.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of MongoDB. See Getting started.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow connections. See Managing your connections in Apache Airflow.
Prerequisites
- A MongoDB cluster. Astronomer recommends using MongoDB Atlas, a hosted MongoDB cluster with integrated data services that offers a free trial. See Getting started with MongoDB Atlas.
- The Astro CLI.
Step 1: Configure your MongoDB Atlas cluster and database
First you will need to configure your MongoDB Atlas cluster so Airflow can connect to it.
-
In your MongoDB Atlas account under Security, go to Database Access and create a database user with a password. Make sure the user has privileges to write data to the database.
-
Go to Security -> Network Access and add your public IP address to the IP access list. You can find your public IP address on Mac and Linux by running
curl ifconfig.co/
, or on Windows by runningipconfig /all
.
Step 2: Configure your Astro project
Use the Astro CLI to create and run an Airflow project locally.
-
Create a new Astro project:
$ mkdir astro-mongodb-tutorial && cd astro-mongodb-tutorial
$ astro dev init -
Add the following line to the
requirements.txt
file of your Astro project:apache-airflow-providers-mongo==3.0.0
This installs the Mongo provider package that contains all of the relevant MongoDB modules.
-
Run the following command to start your project in a local environment:
astro dev start
Step 3: Configure your Airflow connections
The connections you configure will connect to MongoDB and the API providing sample data.
-
In the Airflow UI, go to Admin -> Connections.
-
Create a new connection named
mongo_default
and choose theMongoDB
connection type. Enter the following information:- Host: Your MongoDB Atlas host name
- Login: Your database user ID
- Password: Your database user password
- Extra:
{"srv": true}
Your connection should look something like this:
If you don't know your MongoDB Atlas host name, go to your database in the Atlas UI and click on Connect. Any of the connection options in this section will give you a connection URI that will include your host name. For more on connecting to your MongoDB cluster, see Connect to a database deployment.
-
Create a second connection named
http_default
and choose theHTTP
connection type. Enter the following information:- Host: api.frankfurter.app
This is the API you will gather data from to load into MongoDB. You can also replace this connection with a different API of your choosing.
Step 4: Create your DAG
In your Astro project dags
folder, create a new file called mongo-pipeline.py
. Paste the following code into the file:
import json
from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.mongo.hooks.mongo import MongoHook
from pendulum import datetime
@dag(
dag_id="load_data_to_mongodb",
schedule=None,
start_date=datetime(2022, 10, 28),
catchup=False,
default_args={
"retries": 0,
},
)
def load_data_to_mongodb():
t1 = SimpleHttpOperator(
task_id="get_currency",
method="GET",
endpoint="2022-01-01..2022-06-30",
headers={"Content-Type": "application/json"},
do_xcom_push=True,
)
@task
def uploadtomongo(result):
hook = MongoHook(mongo_conn_id="mongo_default")
client = hook.get_conn()
db = (
client.MyDB
) # Replace "MyDB" if you want to load data to a different database
currency_collection = db.currency_collection
print(f"Connected to MongoDB - {client.server_info()}")
d = json.loads(result)
currency_collection.insert_one(d)
t1 >> uploadtomongo(result=t1.output)
load_data_to_mongodb()
This DAG gets currency data from an API using the SimpleHttpOperator and loads the data into MongoDB using the MongoHook and the PythonOperator. The data will be loaded as a new collection in a database called MyDB
.
Step 5: Run the DAG and review the data
Go to the Airflow UI, unpause your load_data_to_mongodb
DAG, and trigger it to grab data from the currency API and load it to your MongoDB cluster.
In the MongoDB Atlas UI, go to your cluster and click Collections to view the data you just loaded.
Conclusion
Congratulations! You now know how to use Airflow to load data to your MongoDB cluster. A great next step is to analyze that data using MongoDB Charts in Mongo Atlas. For more on this, see Mongo's complementary tutorial.