Orchestrate OpenAI operations with Apache Airflow

OpenAI is an AI research and deployment company that provides an API for accessing state of the art models like GPT-4 and DALL·E 3. The OpenAI Airflow provider offers modules to easily integrate OpenAI with Airflow.

In this tutorial you’ll use Airflow and the OpenAI Airflow provider to ask a question to Star Trek captains, create embeddings of the answers from each captain, and plot them in two dimensions.

Why use Airflow with OpenAI?

OpenAI offers a variety of powerful model endpoints for different tasks like text generation, vector embedding, and translation tasks. These models are used in both user-facing applications, such as chatbots, and internal applications, such as a smart search for internal knowledge base content.

Integrating OpenAI with Airflow into an end-to-end machine learning pipeline allows you to:

  • Use Airflow’s data-driven scheduling to run operations using OpenAI model endpoints based on upstream events in your data ecosystem, such as when new user input is ingested or a new dataset is available.
  • Send several requests to a model endpoint in parallel based on upstream events in your data ecosystem or user input via Airflow params.
  • Monitor the OpenAI service using Airflow alerts and protect against API rate limits and outages with Airflow retries.
  • Use Airflow to orchestrate the creation of vector embeddings using OpenAI models, which is especially useful for large datasets that can’t be processed automatically by vector databases.

Time to complete

This tutorial takes approximately 15 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

Step 1: Configure your Astro project

  1. Create a new Astro project:

    1$ mkdir astro-openai-tutorial && cd astro-openai-tutorial
    2$ astro dev init
  2. Add the following lines to your requirements.txt file to install the OpenAI Airflow provider and other supporting packages:

    apache-airflow-providers-openai==1.0.0
    openai==0.28.1
    matplotlib==3.8.1
    seaborn==0.13.0
    scikit-learn==1.3.2
    pandas==1.5.3
    numpy==1.26.2
    adjustText==0.8
  3. To create an Airflow connection to OpenAI, add the following environment variables to your .env file. Make sure to replace <your-openai-api-key> with your own OpenAI API key.

    AIRFLOW_CONN_OPENAI_DEFAULT='{
    "conn_type": "openai",
    "password": "<your-openai-api-key>"
    }'

Step 2: Create your DAG

  1. In your dags folder, create a file called captains_dag.py.

  2. Copy the following code into the file.

    1"""
    2## Ask questions to Star Trek captains using OpenAI's LLMs, embed and visualize the results
    3
    4This DAG shows how to use the OpenAI Airflow provider to interact with the OpenAI API.
    5The DAG asks a question to a list of Star Trek captains based on values you provide via
    6Airflow params, embeds the responses using the OpenAI text-embedding-ada-002 model,
    7and visualizes the embeddings in 2 dimensions using PCA, matplotlib and seaborn.
    8"""
    9
    10from airflow.decorators import dag, task
    11from airflow.models.param import Param
    12from airflow.models.baseoperator import chain
    13from airflow.providers.openai.hooks.openai import OpenAIHook
    14from airflow.providers.openai.operators.openai import OpenAIEmbeddingOperator
    15from sklearn.metrics.pairwise import euclidean_distances
    16from sklearn.decomposition import PCA
    17from adjustText import adjust_text
    18from pendulum import datetime
    19import matplotlib.pyplot as plt
    20import seaborn as sns
    21import pandas as pd
    22import numpy as np
    23import openai
    24
    25
    26OPENAI_CONN_ID = "openai_default"
    27IMAGE_PATH = "include/captains_plot.png"
    28
    29star_trek_captains_list = [
    30 "James T. Kirk",
    31 "Jean-Luc Picard",
    32 "Benjamin Sisko",
    33 "Kathryn Janeway",
    34 "Jonathan Archer",
    35 "Christopher Pike",
    36 "Michael Burnham",
    37 "Saru",
    38]
    39
    40
    41@dag(
    42 start_date=datetime(2023, 11, 1),
    43 schedule=None,
    44 catchup=False,
    45 params={
    46 "question": Param(
    47 "Which is your favorite ship?",
    48 type="string",
    49 title="Question to ask the captains",
    50 description="Enter what you would like to ask the captains.",
    51 min_length=1,
    52 max_length=500,
    53 ),
    54 "captains_to_ask": Param(
    55 star_trek_captains_list,
    56 type="array",
    57 description="List the captains whose answers you would like to compare. "
    58 + "Suggestions: "
    59 + ", ".join(star_trek_captains_list),
    60 ),
    61 "max_tokens_answer": Param(
    62 100,
    63 type="integer",
    64 description="Maximum number of tokens to generate for the answer.",
    65 ),
    66 "randomness_of_answer": Param(
    67 10,
    68 type="integer",
    69 description=(
    70 "Enter the desired randomness of the answer on a scale"
    71 + "from 0 (no randomness) to 20 (full randomness). "
    72 + "This setting corresponds to 10x the temperature setting in the OpenAI API."
    73 ),
    74 min=0,
    75 max=20,
    76 ),
    77 },
    78)
    79def captains_dag():
    80 @task
    81 def get_captains_list(**context):
    82 "Pull the list of captains to ask from the context."
    83 captains_list = context["params"]["captains_to_ask"]
    84 return captains_list
    85
    86 @task
    87 def ask_a_captain(open_ai_conn_id: str, captain_to_ask, **context):
    88 "Ask a captain a question using gpt-3.5-turbo."
    89 question = context["params"]["question"]
    90 max_tokens_answer = context["params"]["max_tokens_answer"]
    91 randomness_of_answer = context["params"]["randomness_of_answer"]
    92 hook = OpenAIHook(conn_id=open_ai_conn_id)
    93 openai.api_key = hook._get_api_key()
    94
    95 response = openai.ChatCompletion.create(
    96 model="gpt-3.5-turbo",
    97 messages=[
    98 {"role": "system", "content": f"You are captain {captain_to_ask}."},
    99 {"role": "user", "content": question},
    100 ],
    101 temperature=randomness_of_answer / 10,
    102 max_tokens=max_tokens_answer,
    103 )
    104
    105 response = response.choices[0]["message"]["content"]
    106
    107 print(f"Your Question: {question}")
    108 print(f"Captain {captain_to_ask} said: {response}")
    109
    110 return response
    111
    112 captains_list = get_captains_list()
    113 captain_responses = ask_a_captain.partial(open_ai_conn_id=OPENAI_CONN_ID).expand(
    114 captain_to_ask=captains_list
    115 )
    116
    117 get_embeddings = OpenAIEmbeddingOperator.partial(
    118 task_id="get_embeddings",
    119 conn_id=OPENAI_CONN_ID,
    120 model="text-embedding-ada-002",
    121 ).expand(input_text=captain_responses)
    122
    123 @task
    124 def plot_embeddings(embeddings, text_labels, file_name="embeddings_plot.png"):
    125 "Plot the embeddings of the captain responses."
    126 pca = PCA(n_components=2)
    127 reduced_embeddings = pca.fit_transform(embeddings)
    128
    129 plt.figure(figsize=(10, 8))
    130 df_embeddings = pd.DataFrame(reduced_embeddings, columns=["PC1", "PC2"])
    131 sns.scatterplot(
    132 df_embeddings, x="PC1", y="PC2", s=100, color="gold", edgecolor="black"
    133 )
    134
    135 font_style = {"color": "black"}
    136 texts = []
    137 for i, label in enumerate(text_labels):
    138 texts.append(
    139 plt.text(
    140 reduced_embeddings[i, 0],
    141 reduced_embeddings[i, 1],
    142 label,
    143 fontdict=font_style,
    144 fontsize=15,
    145 )
    146 )
    147
    148 # prevent overlapping labels
    149 adjust_text(texts, arrowprops=dict(arrowstyle="->", color="red"))
    150
    151 distances = euclidean_distances(reduced_embeddings)
    152 np.fill_diagonal(distances, np.inf) # exclude cases where the distance is 0
    153
    154 n = distances.shape[0]
    155 distances_list = [
    156 (distances[i, j], (i, j)) for i in range(n) for j in range(i + 1, n)
    157 ]
    158
    159 distances_list.sort(reverse=True)
    160
    161 legend_handles = []
    162 for dist, (i, j) in distances_list:
    163 (line,) = plt.plot(
    164 [reduced_embeddings[i, 0], reduced_embeddings[j, 0]],
    165 [reduced_embeddings[i, 1], reduced_embeddings[j, 1]],
    166 "gray",
    167 linestyle="--",
    168 alpha=0.3,
    169 )
    170 legend_handles.append(line)
    171
    172 legend_labels = [
    173 f"{text_labels[i]} - {text_labels[j]}: {dist:.2f}"
    174 for dist, (i, j) in distances_list
    175 ]
    176
    177 for i in range(len(reduced_embeddings)):
    178 for j in range(i + 1, len(reduced_embeddings)):
    179 plt.plot(
    180 [reduced_embeddings[i, 0], reduced_embeddings[j, 0]],
    181 [reduced_embeddings[i, 1], reduced_embeddings[j, 1]],
    182 "gray",
    183 linestyle="--",
    184 alpha=0.5,
    185 )
    186
    187 plt.legend(
    188 legend_handles,
    189 legend_labels,
    190 title="Distances",
    191 loc="center left",
    192 bbox_to_anchor=(1, 0.5),
    193 )
    194
    195 plt.tight_layout()
    196 plt.title(
    197 "2D Visualization of captain responses", fontsize=16, fontweight="bold"
    198 )
    199 plt.xlabel("PCA Component 1", fontdict=font_style)
    200 plt.ylabel("PCA Component 2", fontdict=font_style)
    201
    202 plt.savefig(file_name, bbox_inches="tight")
    203 plt.close()
    204
    205 chain(
    206 get_embeddings,
    207 plot_embeddings(
    208 get_embeddings.output,
    209 text_labels=captains_list,
    210 file_name=IMAGE_PATH,
    211 ),
    212 )
    213
    214
    215captains_dag()

    This DAG consists of four tasks to make a simple MLOps pipeline.

    • The get_captains_list task fetches the list of Star Trek captains you want to ask your question to. You’ll provide the list of captains when you run the DAG with Airflow params.
    • The ask_a_captain task uses the OpenAIHook to connect to the OpenAI API. It then uses the chat completion endpoint to generate answers to the question you provide. This task is dynamically mapped over the list of captains to generate one dynamically mapped task instance per captain.
    • The get_embeddings task is defined using the OpenAIEmbeddingOperator to generate vector embeddings of the answers generated by the upstream ask_a_captain task. This task is dynamically mapped over the list of answers to retrieve one set of embeddings per answer. This pattern allows for efficient parallelization of the vector embedding generation.
    • The plot_embeddings task takes the embeddings created by the upstream task and performs dimensionality reduction using PCA to plot the embeddings in two dimensions.

    Screenshot of the Airflow UI showing the successful completion of the captains_dag DAG in the Grid view with the Graph tab selected. All 8 captains available were selected to be asked the question, which led to 8 mapped task instances of both the ask_a_captain and get_embeddings task.

Step 3: Run your DAG

  1. Run astro dev start in your Astro project to start Airflow, then open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the captains_dag DAG by clicking the play button. Then, provide Airflow params for:

    • Question to ask the captain: The question you want to ask the captains.
    • captains_to_ask: A list of Star Trek captains you want to ask the question to. Make sure to create one line per captain and to provide at least two names.
    • max_tokens_answer: The maximum number of tokens available for the answer.
    • randomness_of_answer: The randomness of the answer. The value provided is divided by 10 and given to the temperature parameter of the chat completion endpoint. The scale for the param ranges from 0 to 20, with 0 being the most deterministic and 20 being the most random.

    Screenshot of the Airflow UI showing the params available for the captains_dag DAG with the default choices.

  3. After the DAG run completed, go to the include folder to view the image file created by the plot_embeddings task. The image should look similar to the one below.

    Screenshot of the image created by the plot_embeddings task showing the two dimensional representation of the closeness of answers associated with different Star Trek captains.

Conclusion

Congratulations! You used Airflow and OpenAI to get answers from your favorite Star Trek captains and compare them visually. You can now use Airflow to orchestrate OpenAI operations in your own machine learning pipelines. 🖖