Orchestrate MongoDB operations with Apache Airflow

MongoDB is a general-purpose document database that supports high-dimensional embeddings for text data and complex search queries. With the Mongo DB Airflow provider, you can orchestrate interactions with MongoDB from your Airflow DAGs.

In this tutorial, you’ll use Apache Airflow, MongoDB, and OpenAI to create a pipeline to ingest, embed, store, and query video game descriptions in MongoDB.

Why use Airflow with MongoDB?

MongoDB can store and query high-dimensional embeddings for text data, which is useful for applications like recommendation systems. You can orchestrate all steps of the process with Airflow, from ingesting data to querying it.

By integrating MongoDB with Airflow, you can:

  • Use Airflow’s data-driven scheduling capabilities to trigger operations in MongoDB based on other events in your data ecosystem, like the training of a new model having completed or the successful ingestion of new text data.
  • Create pipelines interacting with MongoDB that adapt to changes in your data at runtime using dynamic Airflow tasks.
  • Add Airflow features such as retries, branching, and alerts to MongoDB operations for handling complex task dependencies or task failures.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • A MongoDB cluster. Astronomer recommends using MongoDB Atlas, a hosted MongoDB cluster with integrated data services that offers a free trial. See Getting started with MongoDB Atlas.
  • The Astro CLI.
  • An OpenAI API key of at least tier 1. If you do not want to use OpenAI, you will have to adjust the code in the embedding functions to use a different embedding service.

Step 1: Configure your MongoDB Atlas cluster

First you will need to configure your MongoDB Atlas cluster so Airflow can connect to it.

  1. In your MongoDB Atlas account under Security, go to Database Access and create a database user with a password. Make sure the user has privileges to write data to the database and to save the password in a secure location, as you will need it later.

    Screenshot of the MongoDB Atlas UI showing how to add a new user.

  2. If you haven’t already done so during your MongoDB setup, go to Security -> Network Access and add your public IP address to the IP access list. You can find your public IP address on Mac and Linux by running curl ifconfig.co/, or on Windows by running ipconfig /all.

Step 2: Configure your Astro project

Use the Astro CLI to create and run an Airflow project locally.

  1. Create a new Astro project:

    1$ mkdir astro-mongodb-tutorial && cd astro-mongodb-tutorial
    2$ astro dev init
  2. Add the following lines to the requirements.txt file of your Astro project:

    apache-airflow-providers-mongo==4.2.2
    openai==1.52.0

    This installs the Mongo provider package that contains all of the relevant MongoDB modules for Airflow, as well as the OpenAI package for embedding text data.

  3. In the .env file add the following environment variables:

    AIRFLOW_CONN_MONGODB_DEFAULT='{
    "conn_type": "mongo",
    "host": "<your_cluster>.mongodb.net",
    "login": "<your_user>",
    "password": "<your_password>",
    "extra": {
    "srv": "true",
    "ssl": "true"
    }
    }'
    OPENAI_API_KEY="sk-<your_openai_api_key>"

    Replace <your openai api key> with your OpenAI API key and <your_cluster>, <your_user>, and <your_password> with your MongoDB Atlas cluster name, database user, and database user password created in Step 1. You can find your MongoDB Atlas cluster name in the Atlas UI by clicking on Connect in your cluster overview.

The AIRFLOW_CONN_MONGODB_DEFAULT environment variable is used to create a connection to your MongoDB cluster in Airflow with the connection ID mongodb_default. In the MongoDB Airflow provider version 4.2.2 and later, it is also possible to set the connection in the Airflow UI. To do so, provide the following connection details:

  • Connection Id: mongodb_default
  • Connection Type: MongoDB
  • Host: <your_cluster>.mongodb.net
  • Username: <your_user>
  • Password: <your_password>
  • SRV Connection: Checked
  • Use SSL: Checked

While leaving all other fields blank.

Step 3: Add your data

The DAG in this tutorial runs a query on vectorized game descriptions.

Create a new file called games.txt in the include directory, then copy and paste the following information:

1 ::: Minecraft (2009) ::: sandbox ::: In a blocky, procedurally-generated world, players explore, gather resources, craft tools, and build structures, with the option to fight off monsters and explore vast environments.
2 ::: The Sims 2 (2004) ::: life simulation ::: Players create and control characters, managing their lives, relationships, and homes in a virtual world, while guiding them through everyday tasks and fulfilling personal ambitions.
3 ::: Call of Duty: Modern Warfare 2 (2009) ::: shooter ::: In this intense military first-person shooter, players join elite military operations across the globe, fighting in a fast-paced war against a dangerous enemy.
4 ::: Halo 3 (2007) ::: sci-fi shooter ::: Master Chief returns to finish the fight against the Covenant and the Flood, battling across futuristic environments in a war to save humanity.
5 ::: Star Wars: Battlefront 2 (2005) ::: action shooter ::: Players fight in large-scale battles across iconic Star Wars locations, engaging in both ground and space combat to claim victory.
6 ::: Age of Mythology (2002) ::: real-time strategy ::: Players command armies of mythological creatures and heroes, leveraging the powers of gods to wage war in an ancient, myth-inspired world.
7 ::: Stronghold (2001) ::: real-time strategy ::: In a medieval world, players build and manage castles, control armies, and lay siege to enemies, balancing economic management with military strategy.
8 ::: Command & Conquer: Tiberium Wars (2007) ::: real-time strategy ::: In a futuristic setting, players lead global military factions battling over the alien resource Tiberium, engaging in fast-paced tactical warfare.
9 ::: Minesweeper (1990) ::: puzzle ::: In this classic puzzle game, players use logic to uncover hidden mines on a grid without triggering any explosions.
10 ::: Addy Junior (2000) ::: educational ::: An educational game designed to help children improve reading, math, and problem-solving skills through engaging, playful activities.
11 ::: Impossible Creatures (2003) ::: real-time strategy ::: Players design and combine creatures using DNA from various animals, creating unique hybrids to lead in battle across an alternate 1930s world.
12 ::: World of Warcraft (2004) ::: MMORPG ::: Players explore a vast fantasy world, completing quests, battling enemies, and forming alliances in an ever-changing landscape of adventure.

Step 4: Create your DAG

In your Astro project dags folder, create a new file called query_game_vectors.py. Paste the following code into the file:

1"""
2## Tutorial DAG: Load and query video game descriptions with MongoDB and OpenAI
3"""
4
5import logging
6import os
7
8from airflow.decorators import dag, task
9from airflow.models.baseoperator import chain
10from airflow.models.param import Param
11from airflow.operators.empty import EmptyOperator
12from airflow.providers.mongo.hooks.mongo import MongoHook
13from pendulum import datetime
14
15t_log = logging.getLogger("airflow.task")
16
17_MONGO_DB_CONN = os.getenv("MONGO_DB_CONN", "mongodb_default")
18_MONGO_DB_DATABASE_NAME = os.getenv("MONGO_DB_DATABASE_NAME", "games")
19_MONGO_DB_COLLECTION_NAME = os.getenv("MONGO_DB_COLLECTION_NAME", "games_nostalgia")
20_MONGO_DB_SEARCH_INDEX_NAME = os.getenv("MONGO_DB_SEARCH_INDEX_NAME", "find_me_a_game")
21_MONGO_DB_VECTOR_COLUMN_NAME = os.getenv("MONGO_DB_VECTOR_COLUMN_NAME", "vector")
22
23_OPENAI_EMBEDDING_MODEL = os.getenv("OPENAI_EMBEDDING_MODEL", "text-embedding-3-small")
24_OPENAI_EMBEDDING_MODEL_DIMENSIONS = os.getenv(
25 "OPENAI_EMBEDDING_MODEL_DIMENSIONS", 1536
26)
27
28_DATA_TEXT_FILE_PATH = os.getenv("DATA_TEXT_FILE_PATH", "include/games.txt")
29
30_COLLECTION_EXISTS_TASK_ID = "collection_already_exists"
31_CREATE_COLLECTION_TASK_ID = "create_collection"
32_CREATE_INDEX_TASK_ID = "create_search_index"
33_INDEX_EXISTS_TASK_ID = "search_index_already_exists"
34
35
36def _get_mongodb_database(
37 mongo_db_conn_id: str = _MONGO_DB_CONN,
38 mongo_db_database_name: str = _MONGO_DB_DATABASE_NAME,
39):
40 """
41 Get the MongoDB database.
42 Args:
43 mongo_db_conn_id (str): The connection ID for the MongoDB connection.
44 mongo_db_database_name (str): The name of the database.
45 Returns:
46 The MongoDB database.
47 """
48 hook = MongoHook(mongo_conn_id=mongo_db_conn_id)
49 client = hook.get_conn()
50 return client[mongo_db_database_name]
51
52
53def _create_openai_embeddings(text: str, model: str):
54 """
55 Create embeddings for a text with the OpenAI API.
56 Args:
57 text (str): The text to create embeddings for.
58 model (str): The OpenAI model to use.
59 Returns:
60 The embeddings for the text.
61 """
62 from openai import OpenAI
63
64 client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
65 response = client.embeddings.create(input=text, model=model)
66 embeddings = response.data[0].embedding
67
68 return embeddings
69
70
71@dag(
72 start_date=datetime(2024, 10, 1),
73 schedule=None,
74 catchup=False,
75 max_consecutive_failed_dag_runs=5,
76 tags=["mongodb"],
77 doc_md=__doc__,
78 params={
79 "game_concepts": Param(
80 ["fantasy", "quests"],
81 type="array",
82 description=(
83 "What kind of game do you want to play today?"
84 + " Add one concept per line."
85 ),
86 ),
87 },
88)
89def query_game_vectors():
90
91 @task.branch
92 def check_for_collection() -> str:
93 "Check if the provided collection already exists and decide on the next step."
94 database = _get_mongodb_database()
95 collection_list = database.list_collection_names()
96 if _MONGO_DB_COLLECTION_NAME in collection_list:
97 return _COLLECTION_EXISTS_TASK_ID
98 else:
99 return _CREATE_COLLECTION_TASK_ID
100
101 @task(task_id=_CREATE_COLLECTION_TASK_ID)
102 def create_collection():
103 "Create a new collection in the database."
104 database = _get_mongodb_database()
105 database.create_collection(_MONGO_DB_COLLECTION_NAME)
106
107 collection_already_exists = EmptyOperator(task_id=_COLLECTION_EXISTS_TASK_ID)
108 collection_ready = EmptyOperator(
109 task_id="collection_ready", trigger_rule="none_failed"
110 )
111
112 @task
113 def extract() -> list:
114 """
115 Extract the games from the text file.
116 Returns:
117 list: A list with the games.
118 """
119 import re
120
121 with open(_DATA_TEXT_FILE_PATH, "r") as f:
122 games = f.readlines()
123
124 games_list = []
125
126 for game in games:
127
128 parts = game.split(":::")
129 title_year = parts[1].strip()
130 match = re.match(r"(.+) \((\d{4})\)", title_year)
131
132 title, year = match.groups()
133 year = int(year)
134
135 genre = parts[2].strip()
136 description = parts[3].strip()
137
138 game_data = {
139 "title": title,
140 "year": year,
141 "genre": genre,
142 "description": description,
143 }
144
145 games_list.append(game_data)
146
147 return games_list
148
149 @task(map_index_template="{{ game_str }}")
150 def transform_create_embeddings(game: dict) -> dict:
151 """
152 Create embeddings for the game description.
153 Args:
154 game (dict): A dictionary with the game's data.
155 Returns:
156 dict: The game's data with the embeddings.
157 """
158 embeddings = _create_openai_embeddings(
159 text=game.get("description"), model=_OPENAI_EMBEDDING_MODEL
160 )
161 game[_MONGO_DB_VECTOR_COLUMN_NAME] = embeddings
162
163 # optional: setting the custom map index
164 from airflow.operators.python import get_current_context
165
166 context = get_current_context()
167 context["game_str"] = f"{game['title']} ({game['year']}) - {game['genre']}"
168
169 return game
170
171 @task(trigger_rule="none_failed", map_index_template="{{ game_str }}")
172 def load_data_to_mongo_db(game_data: dict) -> None:
173 """
174 Load the game data to the MongoDB collection.
175 Args:
176 game_data (dict): A dictionary with the game's data.
177 """
178
179 database = _get_mongodb_database()
180 collection = database[_MONGO_DB_COLLECTION_NAME]
181
182 filter_query = {
183 "title": game_data["title"],
184 "year": game_data["year"],
185 "genre": game_data["genre"],
186 }
187
188 game_str = f"{game_data['title']} ({game_data['year']}) - {game_data['genre']}"
189
190 existing_document = collection.find_one(filter_query)
191
192 if existing_document:
193 if existing_document.get("description") != game_data["description"]:
194 collection.update_one(
195 filter_query, {"$set": {"description": game_data["description"]}}
196 )
197 t_log.info(f"Updated description for record: {game_str}")
198 else:
199 t_log.info(f"Skipped duplicate record: {game_str}")
200 else:
201 collection.update_one(
202 filter_query, {"$setOnInsert": game_data}, upsert=True
203 )
204 t_log.info(f"Inserted record: {game_str}")
205
206 # optional: setting the custom map index
207 from airflow.operators.python import get_current_context
208
209 context = get_current_context()
210 context["game_str"] = game_str
211
212 @task.branch
213 def check_for_search_index() -> str:
214 "Check if the provided index already exists and decide on the next step."
215 database = _get_mongodb_database()
216 collection = database[_MONGO_DB_COLLECTION_NAME]
217 index_list = collection.list_search_indexes().to_list()
218 index_name_list = [index.get("name") for index in index_list]
219 if _MONGO_DB_SEARCH_INDEX_NAME in index_name_list:
220 return _INDEX_EXISTS_TASK_ID
221 else:
222 return _CREATE_INDEX_TASK_ID
223
224 @task(task_id=_CREATE_INDEX_TASK_ID)
225 def create_search_index():
226 """
227 Create a search index model for the MongoDB collection.
228 """
229 from pymongo.operations import SearchIndexModel
230
231 database = _get_mongodb_database()
232 collection = database[_MONGO_DB_COLLECTION_NAME]
233
234 search_index_model = SearchIndexModel(
235 definition={
236 "mappings": {
237 "dynamic": True,
238 "fields": {
239 _MONGO_DB_VECTOR_COLUMN_NAME: {
240 "type": "knnVector",
241 "dimensions": _OPENAI_EMBEDDING_MODEL_DIMENSIONS,
242 "similarity": "cosine",
243 }
244 },
245 },
246 },
247 name=_MONGO_DB_SEARCH_INDEX_NAME,
248 )
249
250 collection.create_search_index(model=search_index_model)
251
252 search_index_already_exists = EmptyOperator(task_id=_INDEX_EXISTS_TASK_ID)
253
254 @task.sensor(
255 poke_interval=10, timeout=3600, mode="poke", trigger_rule="none_failed"
256 )
257 def wait_for_full_indexing():
258 """
259 Wait for the search index to be fully built.
260 """
261 from airflow.sensors.base import PokeReturnValue
262
263 database = _get_mongodb_database()
264 collection = database[_MONGO_DB_COLLECTION_NAME]
265
266 index_list = collection.list_search_indexes().to_list()
267 index = next(
268 (
269 index
270 for index in index_list
271 if index.get("name") == _MONGO_DB_SEARCH_INDEX_NAME
272 ),
273 None,
274 )
275
276 if index:
277 status = index.get("status")
278 if status == "READY":
279 t_log.info(f"Search index is {status}. Ready to query.")
280 condition_met = True
281 elif status == "FAILED":
282 raise ValueError("Search index failed to build.")
283 else:
284 t_log.info(
285 f"Search index is {status}. Waiting for indexing to complete."
286 )
287 condition_met = False
288 else:
289 raise ValueError("Search index not found.")
290
291 return PokeReturnValue(is_done=condition_met)
292
293 @task
294 def embed_concepts(**context):
295 """
296 Create embeddings for the provided concepts.
297 """
298 from openai import OpenAI
299
300 client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
301 game_concepts = context["params"]["game_concepts"]
302 game_concepts_str = " ".join(game_concepts)
303
304 embeddings = client.embeddings.create(
305 input=game_concepts_str, model=_OPENAI_EMBEDDING_MODEL
306 )
307
308 return embeddings.to_dict()
309
310 @task
311 def query(query_vector: list):
312 """
313 Query the MongoDB collection for games based on the provided concepts.
314 """
315
316 db = _get_mongodb_database()
317 collection = db[_MONGO_DB_COLLECTION_NAME]
318
319 results = collection.aggregate(
320 [
321 {
322 "$vectorSearch": {
323 "exact": True,
324 "index": _MONGO_DB_SEARCH_INDEX_NAME,
325 "limit": 1,
326 "path": _MONGO_DB_VECTOR_COLUMN_NAME,
327 "queryVector": query_vector["data"][0]["embedding"],
328 }
329 }
330 ]
331 )
332
333 results_list = []
334
335 for result in results:
336
337 game_id = str(result["_id"])
338 title = result["title"]
339 year = result["year"]
340 genre = result["genre"]
341 description = result["description"]
342
343 t_log.info(f"You should play {title}!")
344 t_log.info(f"It was released in {year} and belongs to the {genre} genre.")
345 t_log.info(f"Description: {description}")
346
347 results_list.append(
348 {
349 "game_id": game_id,
350 "title": title,
351 "year": year,
352 "genre": genre,
353 "description": description,
354 }
355 )
356
357 return results_list
358
359 _extract = extract()
360 _transform_create_embeddings = transform_create_embeddings.expand(game=_extract)
361 _load_data_to_mongo_db = load_data_to_mongo_db.expand(
362 game_data=_transform_create_embeddings
363 )
364
365 _query = query(embed_concepts())
366
367 chain(
368 check_for_collection(),
369 [create_collection(), collection_already_exists],
370 collection_ready,
371 )
372
373 chain(
374 collection_ready,
375 check_for_search_index(),
376 [create_search_index(), search_index_already_exists],
377 wait_for_full_indexing(),
378 _query,
379 )
380
381 chain(collection_ready, _load_data_to_mongo_db, _query)
382
383
384query_game_vectors()

This DAG consists of thirteen tasks to make a simple ML orchestration pipeline.

  • First, the check_for_collection task checks if the games_nostalgia collection already exists in the games database. If it does, the collection creation is skipped, if not, the collection is created by the create_collection task.
  • Once the collection is ready, a similar pattern is used to create a search index find_me_a_game if it does not already exist.
  • Simultaneously, the game descriptions are being ingested in an ETL pipeline where the transformation includes the creating of vector embeddings using OpenAI’s text-embedding-3-small model. The embeddings are then stored in the games_nostalgia collection alongside the game data.
  • After the search index is ready and the data is ingested, the custom sensor wait_for_full_indexing makes sure the search index is fully built before the query task is triggered.
  • Finally, the query task queries the games_nostalgia collection for the game with the most similar description to the concepts provided in the Airflow params dictionary.

Screenshot of the Airflow UI showing the query_game_vectors DAG graph.

Step 5: Run the DAG and review the data

Now you can run the DAG manually to find a game to play!

  1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080. Log in with admin as the username and password.

  2. In the Airflow UI, run the query_game_vectors DAG by clicking the play button. Then, provide Airflow params for game_concepts.

    Screenshot of the Airflow UI Trigger DAG view showing the concepts fantasy and quests selected as query params.

  3. After the DAG completes successfully, go to the task logs of the query task to see the game with the most similar description to the concepts you provided.

    [2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:343} INFO - You should play World of Warcraft!
    [2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:344} INFO - It was released in 2004 and belongs to the MMORPG genre.
    [2024-10-21, 15:25:36 UTC] {mongo_db_tutorial.py:345} INFO - Description: Players explore a vast fantasy world, completing quests, battling enemies, and forming alliances in an ever-changing landscape of adventure.

Conclusion

Congratulations! You used Airflow and MongoDB to get a game suggestion! You can now use Airflow to orchestrate MongoDB operations in your own pipelines.