Orchestrate Pinecone operations with Apache Airflow

Info

This page has not yet been updated for Airflow 3. The concepts shown are relevant, but some code may need to be updated. If you run any examples, take care to update import statements and watch for any other breaking changes.

Pinecone is a proprietary vector database platform designed for handling large-scale vector based AI applications. The Pinecone Airflow provider offers modules to easily integrate Pinecone with Airflow.

In this tutorial you’ll use Airflow to create vector embeddings of series descriptions, create an index in your Pinecone project, ingest the vector embeddings into that index, and query Pinecone to get a suggestion for your next binge-watchable series based on your current mood.

Why use Airflow with Pinecone?

Integrating Pinecone with Airflow provides a robust solution for managing large-scale vector search workflows in your AI applications. Pinecone specializes in efficient vector storage and similarity search, which is essential for leveraging advanced models like language transformers or deep neural networks.

By combining Pinecone with Airflow, you can:

  • Use Airflow’s data-driven scheduling to run operations in Pinecone based on upstream events in your data ecosystem, such as when a new model is trained or a new dataset is available.
  • Run dynamic queries with dynamic task mapping, for example to parallelize vector ingestion or search operations to improve performance.
  • Add Airflow features like retries and alerts to your Pinecone operations. Retries protect your MLOps pipelines from transient failures, and alerts notify you of events like task failures or missed service level agreements (SLAs).

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI.
  • A Pinecone account with an API key. You can use a free tier account for this tutorial.
  • An OpenAI API key of at least tier 1 if you want to use OpenAI for vectorization. If you do not want to use OpenAI you can adapt the create_embeddings function at the start of the DAG to use a different vectorizer. Note that you will likely need to adjust the EMBEDDING_MODEL_DIMENSIONS parameter in the DAG if you use a different vectorizer.

Info

The example code from this tutorial is also available on GitHub.

Step 1: Configure your Astro project

  1. Create a new Astro project:

    1$ mkdir astro-pinecone-tutorial && cd astro-pinecone-tutorial
    2$ astro dev init
  2. Add the following two lines to your requirements.txt file to install the Pinecone Airflow Provider and OpenAI Python client in your Astro project:

    apache-airflow-providers-pinecone==1.0.0
    openai==1.3.2
  3. Add the following environment variables to your Astro project .env file. These variables store the configuration for an Airflow connection to your Pinecone account and allow you to use the OpenAI API. Provide your own values for <your-pinecone-environment> (for example gcp-starter), <your-pinecone-api-key> and <your-openai-api-key>:

    AIRFLOW_CONN_PINECONE_DEFAULT='{
    "conn_type": "pinecone",
    "login": "<your-pinecone-environment>",
    "password": "<your-pinecone-api-key>"
    }'
    OPENAI_API_KEY="<your-openai-api-key>"

Step 2: Add your data

The DAG in this tutorial runs a query on vectorized series descriptions, which were mostly retrieved from IMDB with added domain expert inputs.

  1. In your Astro project include directory, create a file called series_data.txt.

  2. Copy and paste the following text into the file:

    1 ::: Star Trek: Discovery (2017) ::: sci-fi ::: Ten years before Kirk, Spock, and the Enterprise, the USS Discovery discovers new worlds and lifeforms using a new innovative mushroom based propulsion system.
    2 ::: Feel Good (2020) ::: romance ::: The series follows recovering addict and comedian Mae, who is trying to control the addictive behaviors and intense romanticism that permeate every facet of their life.
    3 ::: For All Mankind (2019) ::: sci-fi ::: The series dramatizes an alternate history depicting "what would have happened if the global space race had never ended" after the Soviet Union succeeds in the first crewed Moon landing ahead of the United States.
    4 ::: The Legend of Korra (2012) ::: anime ::: Avatar Korra fights to keep Republic City safe from the evil forces of both the physical and spiritual worlds.
    5 ::: Mindhunter (2017) ::: crime ::: In the late 1970s, two FBI agents broaden the realm of criminal science by investigating the psychology behind murder and end up getting too close to real-life monsters.
    6 ::: The Umbrella Academy (2019) ::: adventure ::: A family of former child heroes, now grown apart, must reunite to continue to protect the world.
    7 ::: Star Trek: Picard (2020) ::: sci-fi ::: Follow-up series to Star Trek: The Next Generation (1987) and Star Trek: Nemesis (2002) that centers on Jean-Luc Picard in the next chapter of his life.
    8 ::: Invasion (2021) ::: sci-fi ::: Earth is visited by an alien species that threatens humanity's existence. Events unfold in real time through the eyes of five ordinary people across the globe as they struggle to make sense of the chaos unraveling around them.

Step 3: Create your DAG

  1. In your Astro project dags folder, create a file called query_series_vectors.py.

  2. Copy the following code into the file:

    1"""
    2## Use the Pinecone Airflow Provider to generate and query vectors for series descriptions
    3
    4This DAG runs a simple MLOps pipeline that uses the Pinecone Airflow Provider to import
    5series descriptions, generate vectors for them, and query the vectors for series based on
    6a user-provided mood.
    7"""
    8
    9from airflow.decorators import dag, task
    10from airflow.models.param import Param
    11from airflow.models.baseoperator import chain
    12from airflow.providers.pinecone.operators.pinecone import PineconeIngestOperator
    13from airflow.providers.pinecone.hooks.pinecone import PineconeHook
    14from pendulum import datetime
    15from openai import OpenAI
    16import uuid
    17import re
    18import os
    19
    20PINECONE_INDEX_NAME = "series-to-watch"
    21DATA_FILE_PATH = "include/series_data.txt"
    22PINECONE_CONN_ID = "pinecone_default"
    23EMBEDDING_MODEL = "text-embedding-ada-002"
    24EMBEDDING_MODEL_DIMENSIONS = 1536
    25
    26
    27def generate_uuid5(identifier: list) -> str:
    28 "Create a UUID5 from a list of strings and return the uuid as a string."
    29 name = "/".join([str(i) for i in identifier])
    30 namespace = uuid.NAMESPACE_DNS
    31 uuid_obj = uuid.uuid5(namespace=namespace, name=name)
    32 return str(uuid_obj)
    33
    34
    35def create_embeddings(text: str, model: str) -> list:
    36 """Create embeddings for a text with the OpenAI API."""
    37 client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
    38 response = client.embeddings.create(input=text, model=model)
    39 embeddings = response.data[0].embedding
    40
    41 return embeddings
    42
    43
    44@dag(
    45 start_date=datetime(2023, 10, 18),
    46 schedule=None,
    47 catchup=False,
    48 tags=["Pinecone"],
    49 params={"series_mood": Param("A series about astronauts.", type="string")},
    50)
    51def query_series_vectors():
    52 @task
    53 def import_data_func(text_file_path: str) -> list:
    54 "Import data from a text file and return it as a list of dicts."
    55 with open(text_file_path, "r") as f:
    56 lines = f.readlines()
    57 num_skipped_lines = 0
    58 descriptions = []
    59 data = []
    60 for line in lines:
    61 parts = line.split(":::")
    62 title_year = parts[1].strip()
    63 match = re.match(r"(.+) \((\d{4})\)", title_year)
    64 try:
    65 title, year = match.groups()
    66 year = int(year)
    67 except:
    68 num_skipped_lines += 1
    69 continue
    70
    71 genre = parts[2].strip()
    72 description = parts[3].strip()
    73 descriptions.append(description)
    74 data.append(
    75 {
    76 "id": generate_uuid5(
    77 identifier=[title, year, genre, description]
    78 ), # an `id` property is required for Pinecone
    79 "metadata": {
    80 "title": title,
    81 "year": year,
    82 "genre": genre,
    83 "description": description, # this is the text we'll embed
    84 },
    85 }
    86 )
    87
    88 return data
    89
    90 series_data = import_data_func(text_file_path=DATA_FILE_PATH)
    91
    92 @task
    93 def vectorize_series_data(series_data: dict, model: str) -> dict:
    94 "Create embeddings for the series descriptions."
    95 response = create_embeddings(
    96 text=series_data["metadata"]["description"], model=model
    97 )
    98
    99 series_data["values"] = response
    100
    101 return series_data
    102
    103 vectorized_data = vectorize_series_data.partial(model=EMBEDDING_MODEL).expand(
    104 series_data=series_data
    105 )
    106
    107 @task
    108 def vectorize_user_mood(model: str, **context) -> list:
    109 "Create embeddings for the user mood."
    110 user_mood = context["params"]["series_mood"]
    111 response = create_embeddings(text=user_mood, model=model)
    112
    113 return response
    114
    115 @task
    116 def create_index_if_not_exists(
    117 index_name: str, vector_size: int, pinecone_conn_id: str
    118 ) -> None:
    119 "Create a Pinecone index of the provided name if it doesn't already exist."
    120 hook = PineconeHook(conn_id=pinecone_conn_id)
    121 existing_indexes = hook.list_indexes()
    122 if index_name not in existing_indexes:
    123 newindex = hook.create_index(index_name=index_name, dimension=vector_size)
    124 return newindex
    125 else:
    126 print(f"Index {index_name} already exists")
    127
    128 create_index_if_not_exists_obj = create_index_if_not_exists(
    129 vector_size=EMBEDDING_MODEL_DIMENSIONS,
    130 index_name=PINECONE_INDEX_NAME,
    131 pinecone_conn_id=PINECONE_CONN_ID,
    132 )
    133
    134 pinecone_vector_ingest = PineconeIngestOperator(
    135 task_id="pinecone_vector_ingest",
    136 conn_id=PINECONE_CONN_ID,
    137 index_name=PINECONE_INDEX_NAME,
    138 input_vectors=vectorized_data,
    139 )
    140
    141 @task
    142 def query_pinecone(
    143 index_name: str,
    144 pinecone_conn_id: str,
    145 vectorized_user_mood: list,
    146 ) -> None:
    147 "Query the Pinecone index with the user mood and print the top result."
    148 hook = PineconeHook(conn_id=pinecone_conn_id)
    149
    150 query_response = hook.query_vector(
    151 index_name=index_name,
    152 top_k=1,
    153 include_values=True,
    154 include_metadata=True,
    155 vector=vectorized_user_mood,
    156 )
    157
    158 print("You should watch: " + query_response["matches"][0]["metadata"]["title"])
    159 print("Description: " + query_response["matches"][0]["metadata"]["description"])
    160
    161 query_pinecone_obj = query_pinecone(
    162 index_name=PINECONE_INDEX_NAME,
    163 pinecone_conn_id=PINECONE_CONN_ID,
    164 vectorized_user_mood=vectorize_user_mood(model=EMBEDDING_MODEL),
    165 )
    166
    167 chain(
    168 create_index_if_not_exists_obj,
    169 pinecone_vector_ingest,
    170 query_pinecone_obj,
    171 )
    172
    173
    174query_series_vectors()

    This DAG consists of six tasks to make a simple ML orchestration pipeline.

    • The import_data_func task defined with the @task decorator reads the data from the series_data.txt file and returns a list of dictionaries containing the series title, year, genre, and description. Note that the task will create a UUID for each series using the create_uuid function and add it to the id key. Having a unique ID for each series is required for the Pinecone ingestion task.
    • The vectorize_series_data task is a dynamic task that creates one mapped task instance for each series in the list returned by the import_data_func task. The task uses the create_embeddings function to generate vector embeddings for each series’ description. Note that if you want to use a different vectorizer than OpenAI’s text-embedding-ada-002 you can adjust this function to return your preferred vectors and set the EMBEDDING_MODEL_DIMENSIONS parameter in the DAG to the vector size of your model.
    • The vectorize_user_mood task calls the create_embeddings function to generate vector embeddings for the mood the user can provide as an Airflow param.
    • The create_index_if_not_exists task uses the PineconeHook to connect to your Pinecone instance and retrieve the current list of indexes in your Pinecone environment. If no index of the name PINECONE_INDEX_NAME exists yet, the task will create it. Note that with a free tier Pinecone account you can only have one index.
    • The pinecone_vector_ingest task uses the PineconeIngestOperator to ingest the vectorized series data into the index created by the create_index_if_not_exists task.
    • The query_pinecone task performs a vector search in Pinecone to get the series most closely matching the user-provided mood and prints the result to the task logs.

    A screenshot from the Airflow UI's Grid view with the Graph tab selected showing a successful run of the query_series_vectors DAG.

Step 4: Run your DAG

  1. Open your Astro project, then run astro dev start to run Airflow locally.

  2. Open the Airflow UI at localhost:8080, then run the query_series_vectors DAG by clicking the play button. Provide your input to the Airflow param for series_mood.

    A screenshot of the Trigger DAG view in the Airflow UI showing the mood A series about Astronauts being provided to the series_mood param.

  3. View your series suggestion in the task logs of the query_pinecone task:

    [2023-11-20, 14:03:48 UTC] {logging_mixin.py:154} INFO - You should watch: For All Mankind
    [2023-11-20, 14:03:48 UTC] {logging_mixin.py:154} INFO - Description: The series dramatizes an alternate history depicting "what would have happened if the global space race had never ended" after the Soviet Union succeeds in the first crewed Moon landing ahead of the United States.

Tip

When watching For All Mankind, make sure to have a tab with Wikipedia open to compare the alternate timeline with ours and remember, flying spacecraft isn’t like driving a car. It doesn’t just go where you point it.

Conclusion

Congrats! You’ve successfully integrated Airflow and Pinecone! You can now use this tutorial as a starting point to build you own AI applications with Airflow and Pinecone.