Pgvector is an open source extension for PostgreSQL databases that adds the possibility to store and query high-dimensional object embeddings. The pgvector Airflow provider offers modules to easily integrate pgvector with Airflow.
In this tutorial, you use Airflow to orchestrate the embedding of book descriptions with the OpenAI API, ingest the embeddings into a PostgreSQL database with pgvector installed, and query the database for books that match a user-provided mood.
Pgvector allows you to store objects alongside their vector embeddings and to query these objects based on their similarity. Vector embeddings are key components of many modern machine learning models such as LLMs or ResNet.
Integrating PostgreSQL with pgvector and Airflow into one end-to-end machine learning pipeline allows you to:
This tutorial takes approximately 30 minutes to complete (reading your suggested book not included).
To get the most out of this tutorial, make sure you have an understanding of:
create_embeddings function at the start of the DAG to use a different vectorizer.This tutorial uses a local PostgreSQL database created as a Docker container. The image comes with pgvector preinstalled.
The example code from this tutorial is also available on GitHub.
Create a new Astro project:
Add the following two packages to your requirements.txt file to install the pgvector Airflow provider and the OpenAI Python client in your Astro project:
This tutorial uses a local PostgreSQL database running in a Docker container. To add a second PostgreSQL container to your Astro project, create a new file in your project’s root directory called docker-compose.override.yml and add the following. The ankane/pgvector image builds a PostgreSQL database with pgvector preinstalled.
To create an Airflow connection to the PostgreSQL database, add the following to your .env file. If you are using the OpenAI API for embeddings you will need to update the OPENAI_API_KEY environment variable.
The DAG in this tutorial runs a query on vectorized book descriptions from Goodreads, but you can adjust the DAG to use any data you want.
Create a new file called book_data.txt in the include directory.
Copy the book description from the book_data.txt file in Astronomer’s GitHub for a list of great books.
If you want to add your own books make sure the data is in the following format:
One book corresponds to one line in the file.
In your dags folder, create a file called query_book_vectors.py.
Copy the following code into the file. If you want to use a vectorizer other than OpenAI, make sure to adjust both the create_embeddings function at the start of the DAG and provide the correct MODEL_VECTOR_LENGTH.
This DAG consists of nine tasks to make a simple ML orchestration pipeline.
enable_vector_extension_if_not_exists task uses a PostgresOperator to enable the pgvector extension in the PostgreSQL database.create_table_if_not_exists task creates the Book table in PostgreSQL. Note the VECTOR() datatype used for the vector column. This datatype is added to PostgreSQL by the pgvector extension and needs to be defined with the vector length of the vectorizer you use as an argument. This example uses the OpenAI’s text-embedding-ada-002 to create 1536-dimensional vectors, so we define the columns with the type VECTOR(1536) using parameterized SQL.get_already_imported_book_ids task queries the Book table to return all book_id values of books that were already stored with their vectors in previous DAG runs.import_book_data task uses the @task decorator to read the book data from the book_data.txt file and return it as a list of dictionaries with keys corresponding to the columns of the Book table.create_embeddings_book_data task is dynamically mapped over the list of dictionaries returned by the import_book_data task to parallelize vector embedding of all book descriptions that have not been added to the Book table in previous DAG runs. The create_embeddings function defines how the embeddings are computed and can be modified to use other embedding models. If all books in the list have already been added to the Book table, then all mapped task instances are skipped.create_embeddings_query task applies the same create_embeddings function to the desired book mood the user provided via Airflow params.import_embeddings_to_pgvector task uses the PgVectorIngestOperator to insert the book data including the embedding vectors into the PostgreSQL database. This task is dynamically mapped to import the embeddings from one book at a time. The dynamically mapped task instances of books that have already been imported in previous DAG runs are skipped.get_a_book_suggestion task queries the PostgreSQL database for the book that is most similar to the user-provided mood using nearest neighbor search. Note how the vector of the user-provided book mood (query_vector) is cast to the VECTOR datatype before similarity search: ORDER BY vector <-> CAST(%(query_vector)s AS VECTOR).print_book_suggestion task prints the book suggestion to the task logs.
For information on more advanced search techniques in pgvector, see the pgvector README.
Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080.
In the Airflow UI, run the query_book_vectors DAG by clicking the play button. Then, provide the Airflow param for the desired book_mood.

View your book suggestion in the task logs of the print_book_suggestion task:
Congratulations! You used Airflow and pgvector to get a book suggestion! You can now use Airflow to orchestrate pgvector operations in your own machine learning pipelines. Additionally, you remembered the satisfaction and joy of spending hours reading a good book and supported your local library.