Orchestrate semantic querying in Qdrant with Airflow

Qdrant is an open-source vector database and similarity search engine designed for AI applications.

In this tutorial, you’ll use the Qdrant Airflow provider to write a DAG that generates embeddings in parallel and performs semantic retrieval based on user input.

Airflow provides useful operational and orchestration when running operations in Qdrant based on data events or building parallel tasks for generating vector embeddings. By using Airflow, you can set up monitoring and alerts for your pipelines for full observability.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Prerequisites

Step 1: Set up the project

  1. Create a new Astro project:

    $mkdir qdrant-airflow-tutorial && cd qdrant-airflow-tutorial
    >astro dev init
  2. To use Qdrant in Airflow, install the Qdrant Airflow provider by adding the following to your requirements.txt file:

    apache-airflow-providers-qdrant==1.1.0

Step 2: Configure credentials

Add the following code to your .env file to create Airflow connections between Airflow and HuggingFace and Qdrant. Make sure to update the sample code with your HuggingFace access token and Qdrant instance details.

HUGGINGFACE_TOKEN="<YOUR_HUGGINGFACE_ACCESS_TOKEN>"
AIRFLOW_CONN_QDRANT_DEFAULT='{
"conn_type": "qdrant",
"host": "xyz-example.eu-central.aws.cloud.qdrant.io:6333",
"password": "<YOUR_QDRANT_API_KEY>"
}'

Step 3: Add your data

Paste the following sample data into a file called books.txt in your include directory.

1 | To Kill a Mockingbird (1960) | fiction | Harper Lee's Pulitzer Prize-winning novel explores racial injustice and moral growth through the eyes of young Scout Finch in the Deep South.
2 | Harry Potter and the Sorcerer's Stone (1997) | fantasy | J.K. Rowling's magical tale follows Harry Potter as he discovers his wizarding heritage and attends Hogwarts School of Witchcraft and Wizardry.
3 | The Great Gatsby (1925) | fiction | F. Scott Fitzgerald's classic novel delves into the glitz, glamour, and moral decay of the Jazz Age through the eyes of narrator Nick Carraway and his enigmatic neighbour, Jay Gatsby.
4 | 1984 (1949) | dystopian | George Orwell's dystopian masterpiece paints a chilling picture of a totalitarian society where individuality is suppressed and the truth is manipulated by a powerful regime.
5 | The Catcher in the Rye (1951) | fiction | J.D. Salinger's iconic novel follows disillusioned teenager Holden Caulfield as he navigates the complexities of adulthood and society's expectations in post-World War II America.
6 | Pride and Prejudice (1813) | romance | Jane Austen's beloved novel revolves around the lively and independent Elizabeth Bennet as she navigates love, class, and societal expectations in Regency-era England.
7 | The Hobbit (1937) | fantasy | J.R.R. Tolkien's adventure follows Bilbo Baggins, a hobbit who embarks on a quest with a group of dwarves to reclaim their homeland from the dragon Smaug.
8 | The Lord of the Rings (1954-1955) | fantasy | J.R.R. Tolkien's epic fantasy trilogy follows the journey of Frodo Baggins to destroy the One Ring and defeat the Dark Lord Sauron in the land of Middle-earth.
9 | The Alchemist (1988) | fiction | Paulo Coelho's philosophical novel follows Santiago, an Andalusian shepherd boy, on a journey of self-discovery and spiritual awakening as he searches for a hidden treasure.
10 | The Da Vinci Code (2003) | mystery/thriller | Dan Brown's gripping thriller follows symbologist Robert Langdon as he unravels clues hidden in art and history while trying to solve a murder mystery with far-reaching implications.

Step 4: Create your DAG

  1. In your dags folder, create a file called books_recommend.py.

  2. Copy the following Recommend books DAG code into the books_recommend.py file.

    Recommend books DAG
    1import os
    2import requests
    3
    4from airflow.decorators import dag, task
    5from airflow.models.baseoperator import chain
    6from airflow.models.param import Param
    7from airflow.providers.qdrant.hooks.qdrant import QdrantHook
    8from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator
    9from pendulum import datetime
    10from qdrant_client import models
    11
    12
    13QDRANT_CONNECTION_ID = "qdrant_default"
    14DATA_FILE_PATH = "include/books.txt"
    15COLLECTION_NAME = "airflow_tutorial_collection"
    16
    17EMBEDDING_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
    18EMBEDDING_DIMENSION = 384
    19SIMILARITY_METRIC = models.Distance.COSINE
    20
    21
    22def embed(text: str) -> list:
    23 HUGGINFACE_URL = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{EMBEDDING_MODEL_ID}"
    24 response = requests.post(
    25 HUGGINFACE_URL,
    26 headers={"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"},
    27 json={"inputs": [text], "options": {"wait_for_model": True}},
    28 )
    29 return response.json()[0]
    30
    31
    32@dag(
    33 dag_id="books_recommend",
    34 start_date=datetime(2023, 10, 18),
    35 schedule=None,
    36 catchup=False,
    37 params={"preference": Param("Something suspenseful and thrilling.", type="string")},
    38)
    39def recommend_book():
    40 @task
    41 def import_books(text_file_path: str) -> list:
    42 data = []
    43 with open(text_file_path, "r") as f:
    44 for line in f:
    45 _, title, genre, description = line.split("|")
    46 data.append(
    47 {
    48 "title": title.strip(),
    49 "genre": genre.strip(),
    50 "description": description.strip(),
    51 }
    52 )
    53
    54 return data
    55
    56 @task
    57 def init_collection():
    58 hook = QdrantHook(conn_id=QDRANT_CONNECTION_ID)
    59
    60 hook.conn.recreate_collection(
    61 COLLECTION_NAME,
    62 vectors_config=models.VectorParams(
    63 size=EMBEDDING_DIMENSION, distance=SIMILARITY_METRIC
    64 ),
    65 )
    66
    67 @task
    68 def embed_description(data: dict) -> list:
    69 return embed(data["description"])
    70
    71 books = import_books(text_file_path=DATA_FILE_PATH)
    72 embeddings = embed_description.expand(data=books)
    73
    74 qdrant_vector_ingest = QdrantIngestOperator(
    75 conn_id=QDRANT_CONNECTION_ID,
    76 task_id="qdrant_vector_ingest",
    77 collection_name=COLLECTION_NAME,
    78 payload=books,
    79 vectors=embeddings,
    80 )
    81
    82 @task
    83 def embed_preference(**context) -> list:
    84 user_mood = context["params"]["preference"]
    85 response = embed(text=user_mood)
    86
    87 return response
    88
    89 @task
    90 def search_qdrant(
    91 preference_embedding: list,
    92 ) -> None:
    93 hook = QdrantHook(conn_id=QDRANT_CONNECTION_ID)
    94
    95 result = hook.conn.search(
    96 collection_name=COLLECTION_NAME,
    97 query_vector=preference_embedding,
    98 limit=1,
    99 with_payload=True,
    100 )
    101
    102 print("Book recommendation: " + result[0].payload["title"])
    103 print("Description: " + result[0].payload["description"])
    104
    105 chain(
    106 init_collection(),
    107 qdrant_vector_ingest,
    108 search_qdrant(embed_preference()),
    109 )
    110
    111
    112recommend_book()

This QDrant Demo DAG consists of six tasks that generate embeddings in parallel for the data corpus and perform semantic retrieval based on user input.

  • import_books: This task reads a text file containing information about the books (such as title, genre, and description) and then returns the data as a list of dictionaries.

  • init_collection: This task initializes a collection in the Qdrant database, where you store the vector representations of the book descriptions. The recreate_collection() function deletes a collection first if it already exists. Trying to create a collection that already exists throws an error.

  • embed_description: This is a dynamic task that creates one mapped task instance for each book in the list. The task uses the embed function to generate vector embeddings for each description. To use a different embedding model, you can adjust the EMBEDDING_MODEL_ID and EMBEDDING_DIMENSION values.

  • embed_user_preference: This task takes a user’s input and converts it into a vector using the same pre-trained model used for the book descriptions.

  • qdrant_vector_ingest: This task ingests the book data into the Qdrant collection using the QdrantIngestOperator, associating each book description with its corresponding vector embeddings.

  • search_qdrant: Finally, this task performs a search in the Qdrant database using the vectorized user preference. It finds the most relevant book in the collection based on vector similarity.

    Screenshot of the graph view for the Qdrant demo DAG, with the each of the six steps shown.

Step 5: Run your DAG

  1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the books_recommend DAG by clicking the play button. You’ll be asked for input about your book preference.

    Qdrant reference input shows an example of the UI where Airflow prompts you to enter your book preference.

  3. View the output of your search in the logs of the search_qdrant task.

    Screenshot of the Qdrant output example, that shows the title and description of the book recommendation.