Orchestrate pgvector operations with Apache Airflow

Pgvector is an open source extension for PostgreSQL databases that adds the possibility to store and query high-dimensional object embeddings. The pgvector Airflow provider offers modules to easily integrate pgvector with Airflow.

In this tutorial, you use Airflow to orchestrate the embedding of book descriptions with the OpenAI API, ingest the embeddings into a PostgreSQL database with pgvector installed, and query the database for books that match a user-provided mood.

Why use Airflow with pgvector?

Pgvector allows you to store objects alongside their vector embeddings and to query these objects based on their similarity. Vector embeddings are key components of many modern machine learning models such as LLMs or ResNet.

Integrating PostgreSQL with pgvector and Airflow into one end-to-end machine learning pipeline allows you to:

  • Use Airflow’s data-driven scheduling to run operations involving vectors stored in PostgreSQL based on upstream events in your data ecosystem, such as when a new model is trained or a new dataset is available.
  • Run dynamic queries based on upstream events in your data ecosystem or user input via Airflow params on vectors stored in PostgreSQL to retrieve similar objects.
  • Add Airflow features like retries and alerts to your pgvector operations.
  • Check your vector database for the existence of a unique key before running potentially costly embedding operations on your data.

Time to complete

This tutorial takes approximately 30 minutes to complete (reading your suggested book not included).

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI.
  • An OpenAI API key of at least tier 1 if you want to use OpenAI for vectorization. If you do not want to use OpenAI, you can adapt the create_embeddings function at the start of the DAG to use a different vectorizer.

This tutorial uses a local PostgreSQL database created as a Docker container. The image comes with pgvector preinstalled.

Info

The example code from this tutorial is also available on GitHub.

Step 1: Configure your Astro project

  1. Create a new Astro project:

    1$ mkdir astro-pgvector-tutorial && cd astro-pgvector-tutorial
    2$ astro dev init
  2. Add the following two packages to your requirements.txt file to install the pgvector Airflow provider and the OpenAI Python client in your Astro project:

    apache-airflow-providers-pgvector==1.0.0
    openai==1.3.2
  3. This tutorial uses a local PostgreSQL database running in a Docker container. To add a second PostgreSQL container to your Astro project, create a new file in your project’s root directory called docker-compose.override.yml and add the following. The ankane/pgvector image builds a PostgreSQL database with pgvector preinstalled.

    1services:
    2 postgres_pgvector:
    3 image: ankane/pgvector
    4 volumes:
    5 - ${PWD}/include/postgres:/var/lib/postgresql/data
    6 - ${PWD}/include:/include
    7 networks:
    8 - airflow
    9 ports:
    10 - 5433:5432
    11 environment:
    12 - POSTGRES_USER=postgres
    13 - POSTGRES_PASSWORD=postgres
    14# Airflow containers
    15 scheduler:
    16 networks:
    17 - airflow
    18 api-server:
    19 networks:
    20 - airflow
    21 triggerer:
    22 networks:
    23 - airflow
    24 postgres:
    25 networks:
    26 - airflow
  4. To create an Airflow connection to the PostgreSQL database, add the following to your .env file. If you are using the OpenAI API for embeddings you will need to update the OPENAI_API_KEY environment variable.

    AIRFLOW_CONN_POSTGRES_DEFAULT='{
    "conn_type": "postgres",
    "login": "postgres",
    "password": "postgres",
    "host": "host.docker.internal",
    "port": 5433,
    "schema": "postgres"
    }'
    OPENAI_API_KEY="<your-openai-api-key>"

Step 2: Add your data

The DAG in this tutorial runs a query on vectorized book descriptions from Goodreads, but you can adjust the DAG to use any data you want.

  1. Create a new file called book_data.txt in the include directory.

  2. Copy the book description from the book_data.txt file in Astronomer’s GitHub for a list of great books.

Tip

If you want to add your own books make sure the data is in the following format:

<index integer> ::: <title> (<year of publication>) ::: <author> ::: <description>

One book corresponds to one line in the file.

Step 3: Create your DAG

  1. In your dags folder, create a file called query_book_vectors.py.

  2. Copy the following code into the file. If you want to use a vectorizer other than OpenAI, make sure to adjust both the create_embeddings function at the start of the DAG and provide the correct MODEL_VECTOR_LENGTH.

    1"""
    2## Vectorize book descriptions with OpenAI and store them in Postgres with pgvector
    3
    4This DAG shows how to use the OpenAI API 1.0+ to vectorize book descriptions and
    5store them in Postgres with the pgvector extension.
    6It will also help you pick your next book to read based on a mood you describe.
    7
    8You will need to set the following environment variables:
    9- `AIRFLOW_CONN_POSTGRES_DEFAULT`: an Airflow connection to your Postgres database
    10 that has pgvector installed
    11- `OPENAI_API_KEY`: your OpenAI API key
    12"""
    13
    14from airflow.sdk import dag, task
    15from airflow.models.baseoperator import chain
    16from airflow.models.param import Param
    17from airflow.providers.pgvector.operators.pgvector import PgVectorIngestOperator
    18from airflow.providers.postgres.operators.postgres import PostgresOperator
    19from airflow.exceptions import AirflowSkipException
    20from pendulum import datetime
    21from openai import OpenAI
    22import uuid
    23import re
    24import os
    25
    26POSTGRES_CONN_ID = "postgres_default"
    27TEXT_FILE_PATH = "include/book_data.txt"
    28TABLE_NAME = "Book"
    29OPENAI_MODEL = "text-embedding-ada-002"
    30MODEL_VECTOR_LENGTH = 1536
    31
    32
    33def create_embeddings(text: str, model: str):
    34 """Create embeddings for a text with the OpenAI API."""
    35 client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
    36 response = client.embeddings.create(input=text, model=model)
    37 embeddings = response.data[0].embedding
    38
    39 return embeddings
    40
    41
    42@dag(
    43 start_date=datetime(2025, 8, 1),
    44 schedule=None,
    45 tags=["pgvector"],
    46 params={
    47 "book_mood": Param(
    48 "A philosophical book about consciousness.",
    49 type="string",
    50 description="Describe the kind of book you want to read.",
    51 ),
    52 },
    53)
    54def query_book_vectors():
    55 enable_vector_extension_if_not_exists = PostgresOperator(
    56 task_id="enable_vector_extension_if_not_exists",
    57 postgres_conn_id=POSTGRES_CONN_ID,
    58 sql="CREATE EXTENSION IF NOT EXISTS vector;",
    59 )
    60
    61 create_table_if_not_exists = PostgresOperator(
    62 task_id="create_table_if_not_exists",
    63 postgres_conn_id=POSTGRES_CONN_ID,
    64 sql=f"""
    65 CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
    66 book_id UUID PRIMARY KEY,
    67 title TEXT,
    68 year INTEGER,
    69 author TEXT,
    70 description TEXT,
    71 vector VECTOR(%(vector_length)s)
    72 );
    73 """,
    74 parameters={"vector_length": MODEL_VECTOR_LENGTH},
    75 )
    76
    77 get_already_imported_book_ids = PostgresOperator(
    78 task_id="get_already_imported_book_ids",
    79 postgres_conn_id=POSTGRES_CONN_ID,
    80 sql=f"""
    81 SELECT book_id
    82 FROM {TABLE_NAME};
    83 """,
    84 )
    85
    86 @task
    87 def import_book_data(text_file_path: str, table_name: str) -> list:
    88 "Read the text file and create a list of dicts from the book information."
    89 with open(text_file_path, "r") as f:
    90 lines = f.readlines()
    91
    92 num_skipped_lines = 0
    93 list_of_params = []
    94 for line in lines:
    95 parts = line.split(":::")
    96 title_year = parts[1].strip()
    97 match = re.match(r"(.+) \((\d{4})\)", title_year)
    98 try:
    99 title, year = match.groups()
    100 year = int(year)
    101 # skip malformed lines
    102 except:
    103 num_skipped_lines += 1
    104 continue
    105
    106 author = parts[2].strip()
    107 description = parts[3].strip()
    108
    109 list_of_params.append(
    110 {
    111 "book_id": str(
    112 uuid.uuid5(
    113 name=" ".join([title, str(year), author, description]),
    114 namespace=uuid.NAMESPACE_DNS,
    115 )
    116 ),
    117 "title": title,
    118 "year": year,
    119 "author": author,
    120 "description": description,
    121 }
    122 )
    123
    124 print(
    125 f"Created a list with {len(list_of_params)} elements "
    126 " while skipping {num_skipped_lines} lines."
    127 )
    128 return list_of_params
    129
    130 @task
    131 def create_embeddings_book_data(
    132 book_data: dict, model: str, already_imported_books: list
    133 ) -> dict:
    134 "Create embeddings for a book description and add them to the book data."
    135 already_imported_books_ids = [x[0] for x in already_imported_books]
    136 if book_data["book_id"] in already_imported_books_ids:
    137 raise AirflowSkipException("Book already imported.")
    138 embeddings = create_embeddings(text=book_data["description"], model=model)
    139 book_data["vector"] = embeddings
    140 return book_data
    141
    142 @task
    143 def create_embeddings_query(model: str, **context) -> list:
    144 "Create embeddings for the user provided book mood."
    145 query = context["params"]["book_mood"]
    146 embeddings = create_embeddings(text=query, model=model)
    147 return embeddings
    148
    149 book_data = import_book_data(text_file_path=TEXT_FILE_PATH, table_name=TABLE_NAME)
    150 book_embeddings = create_embeddings_book_data.partial(
    151 model=OPENAI_MODEL,
    152 already_imported_books=get_already_imported_book_ids.output,
    153 ).expand(book_data=book_data)
    154 query_vector = create_embeddings_query(model=OPENAI_MODEL)
    155
    156 import_embeddings_to_pgvector = PgVectorIngestOperator.partial(
    157 task_id="import_embeddings_to_pgvector",
    158 trigger_rule="none_failed",
    159 conn_id=POSTGRES_CONN_ID,
    160 sql=(
    161 f"INSERT INTO {TABLE_NAME} "
    162 "(book_id, title, year, author, description, vector) "
    163 "VALUES (%(book_id)s, %(title)s, %(year)s, "
    164 "%(author)s, %(description)s, %(vector)s) "
    165 "ON CONFLICT (book_id) DO NOTHING;"
    166 ),
    167 ).expand(parameters=book_embeddings)
    168
    169 get_a_book_suggestion = PostgresOperator(
    170 task_id="get_a_book_suggestion",
    171 postgres_conn_id=POSTGRES_CONN_ID,
    172 trigger_rule="none_failed",
    173 sql=f"""
    174 SELECT title, year, author, description
    175 FROM {TABLE_NAME}
    176 ORDER BY vector <-> CAST(%(query_vector)s AS VECTOR)
    177 LIMIT 1;
    178 """,
    179 parameters={"query_vector": query_vector},
    180 )
    181
    182 @task
    183 def print_suggestion(query_result, **context):
    184 "Print the book suggestion."
    185 query = context["params"]["book_mood"]
    186 book_title = query_result[0][0]
    187 book_year = query_result[0][1]
    188 book_author = query_result[0][2]
    189 book_description = query_result[0][3]
    190 print(f"Book suggestion for '{query}':")
    191 print(
    192 f"You should read {book_title} by {book_author}, published in {book_year}!"
    193 )
    194 print(f"Goodreads describes the book as: {book_description}")
    195
    196 chain(
    197 enable_vector_extension_if_not_exists,
    198 create_table_if_not_exists,
    199 get_already_imported_book_ids,
    200 import_embeddings_to_pgvector,
    201 get_a_book_suggestion,
    202 print_suggestion(query_result=get_a_book_suggestion.output),
    203 )
    204
    205 chain(query_vector, get_a_book_suggestion)
    206 chain(get_already_imported_book_ids, book_embeddings)
    207
    208
    209query_book_vectors()

    This DAG consists of nine tasks to make a simple ML orchestration pipeline.

    • The enable_vector_extension_if_not_exists task uses a PostgresOperator to enable the pgvector extension in the PostgreSQL database.
    • The create_table_if_not_exists task creates the Book table in PostgreSQL. Note the VECTOR() datatype used for the vector column. This datatype is added to PostgreSQL by the pgvector extension and needs to be defined with the vector length of the vectorizer you use as an argument. This example uses the OpenAI’s text-embedding-ada-002 to create 1536-dimensional vectors, so we define the columns with the type VECTOR(1536) using parameterized SQL.
    • The get_already_imported_book_ids task queries the Book table to return all book_id values of books that were already stored with their vectors in previous DAG runs.
    • The import_book_data task uses the @task decorator to read the book data from the book_data.txt file and return it as a list of dictionaries with keys corresponding to the columns of the Book table.
    • The create_embeddings_book_data task is dynamically mapped over the list of dictionaries returned by the import_book_data task to parallelize vector embedding of all book descriptions that have not been added to the Book table in previous DAG runs. The create_embeddings function defines how the embeddings are computed and can be modified to use other embedding models. If all books in the list have already been added to the Book table, then all mapped task instances are skipped.
    • The create_embeddings_query task applies the same create_embeddings function to the desired book mood the user provided via Airflow params.
    • The import_embeddings_to_pgvector task uses the PgVectorIngestOperator to insert the book data including the embedding vectors into the PostgreSQL database. This task is dynamically mapped to import the embeddings from one book at a time. The dynamically mapped task instances of books that have already been imported in previous DAG runs are skipped.
    • The get_a_book_suggestion task queries the PostgreSQL database for the book that is most similar to the user-provided mood using nearest neighbor search. Note how the vector of the user-provided book mood (query_vector) is cast to the VECTOR datatype before similarity search: ORDER BY vector <-> CAST(%(query_vector)s AS VECTOR).
    • The print_book_suggestion task prints the book suggestion to the task logs.

    Screenshot of the Airflow UI showing the successful completion of the query_book_vectors DAG in the Grid view with the Graph tab selected.

Tip

For information on more advanced search techniques in pgvector, see the pgvector README.

Step 4: Run your DAG

  1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the query_book_vectors DAG by clicking the play button. Then, provide the Airflow param for the desired book_mood.

    Screenshot of the Airflow UI showing the input form for the book_mood param.

  3. View your book suggestion in the task logs of the print_book_suggestion task:

    [2025-08-27, 09:45:54] INFO - Book suggestion for 'A philosophical book about consciousness.':: chan="stdout": source="task"
    [2025-08-27, 09:45:54] INFO - You should read The Idea of the World by Bernardo Kastrup, published in 2019!: chan="stdout": source="task"
    [2025-08-27, 09:45:54] INFO - Goodreads describes the book as: A rigorous case for the primacy of mind in nature, from philosophy to neuroscience, psychology and physics. The Idea of the World offers a grounded alternative to the frenzy of unrestrained abstractions and unexamined assumptions in philosophy and science today. [...]

Step 5: (Optional) Fetch and read the book

  1. Go to the website of your local library and search for the book. If it is available, order it and wait for it to arrive. You will likely need a library card to check out the book.
  2. Make sure to prepare an adequate amount of tea for your reading session. Astronomer recommends Earl Grey, but you can use any tea you like.
  3. Enjoy your book!

Conclusion

Congratulations! You used Airflow and pgvector to get a book suggestion! You can now use Airflow to orchestrate pgvector operations in your own machine learning pipelines. Additionally, you remembered the satisfaction and joy of spending hours reading a good book and supported your local library.