Orchestrate Cohere LLMs with Apache Airflow

Info

This page has not yet been updated for Airflow 3. The concepts shown are relevant, but some code may need to be updated. If you run any examples, take care to update import statements and watch for any other breaking changes.

Cohere is a natural language processing (NLP) platform that provides an API to access cutting-edge large language models (LLMs). The Cohere Airflow provider offers modules to easily integrate Cohere with Airflow.

In this tutorial, you use Airflow and the Cohere Airflow provider to generate recipe suggestions based on a list of ingredients and countries of recipe origin. Additionally, you create embeddings of the recipes and perform dimensionality reduction using principal component analysis (PCA) to plot recipe similarity in two dimensions.

Why use Airflow with Cohere?

Cohere provides highly specialized out-of-the box and custom LLMs. Countless applications use these models for both user-facing needs, such as to moderate user-generated content, and internal purposes, like providing insight into customer support tickets.

Integrating Cohere with Airflow into one end-to-end machine learning pipeline allows you to:

  • Use Airflow’s data-driven scheduling to run operations with Cohere LLM endpoints based on upstream events in your data ecosystem, like when new user input is ingested or a new dataset is available.
  • Send several requests to a model endpoint in parallel based on upstream events in your data ecosystem or user input with Airflow params.
  • Add Airflow features like retries and alerts to your Cohere operations. This is critical for day 2 MLOps operations, for example, to handle model service outages.
  • Use Airflow to orchestrate the creation of vector embeddings with Cohere models, which is especially useful for very large datasets that cannot be processed automatically by vector databases.

Time to complete

This tutorial takes approximately 15 minutes to complete (cooking your recommended recipe not included).

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI.
  • A Cohere API key. You can generate an API key in the Cohere dashboard, accessible with a Cohere account. A free tier API key is sufficient for this tutorial.

Step 1: Configure your Astro project

  1. Create a new Astro project:

    1$ mkdir astro-cohere-tutorial && cd astro-cohere-tutorial
    2$ astro dev init
  2. Add the following lines to your requirements.txt file to install the Cohere Airflow provider and other supporting packages:

    apache-airflow-providers-cohere==1.0.0
    matplotlib==3.8.1
    seaborn==0.13.0
    scikit-learn==1.3.2
    pandas==1.5.3
    numpy==1.26.2
    adjustText==0.8
  3. To create an Airflow connection to Cohere, add the following environment variables to your .env file. Make sure to provide <your-cohere-api-key>.

    AIRFLOW_CONN_COHERE_DEFAULT='{
    "conn_type": "cohere",
    "password": "<your-cohere-api-key>"
    }'

Step 2: Create your DAG

  1. In your dags folder, create a file called recipe_suggestions.py.

  2. Copy the following code into the file.

    1"""
    2## Get recipe suggestions using Cohere's LLMs, embed and visualize the results
    3
    4This DAG shows how to use the Cohere Airflow provider to interact with the Cohere API.
    5The DAG generates recipes based on user input via Airflow params, embeds the
    6responses using Cohere embeddings, and visualizes them in 2 dimensions using PCA,
    7matplotlib and seaborn.
    8"""
    9
    10from airflow.decorators import dag, task
    11from airflow.models.param import Param
    12from airflow.models.baseoperator import chain
    13from airflow.providers.cohere.hooks.cohere import CohereHook
    14from airflow.providers.cohere.operators.embedding import CohereEmbeddingOperator
    15from sklearn.metrics.pairwise import euclidean_distances
    16from sklearn.decomposition import PCA
    17from adjustText import adjust_text
    18from pendulum import datetime
    19import matplotlib.pyplot as plt
    20import seaborn as sns
    21import pandas as pd
    22import numpy as np
    23
    24
    25COHERE_CONN_ID = "cohere_default"
    26IMAGE_PATH = "include/recipe_plot.png"
    27
    28
    29@dag(
    30 start_date=datetime(2023, 11, 1),
    31 schedule=None,
    32 catchup=False,
    33 params={
    34 "countries": Param(
    35 ["Switzerland", "Norway", "New Zealand", "Cameroon", "Bhutan", "Chile"],
    36 type="array",
    37 title="Countries of recipe origin",
    38 description="Enter from which countries you would like to get recipes."
    39 + "List at least two countries.",
    40 ),
    41 "pantry_ingredients": Param(
    42 ["gruyere", "olives", "potatoes", "onions", "pineapple"],
    43 type="array",
    44 description="List the ingredients you have in your pantry, you'd like to use",
    45 ),
    46 "type": Param(
    47 "vegetarian",
    48 type="string",
    49 enum=["vegan", "vegetarian", "omnivore"],
    50 description="Select the type of recipe you'd like to get.",
    51 ),
    52 "max_tokens_recipe": Param(
    53 500,
    54 type="integer",
    55 description="Enter the max number of tokens the model should generate.",
    56 ),
    57 "randomness_of_recipe": Param(
    58 25,
    59 type="integer",
    60 description=(
    61 "Enter the desired randomness of the recipe on a scale"
    62 + "from 0 (no randomness) to 50 (full randomness). "
    63 + "This setting corresponds to 10x the temperature setting in the Cohere API."
    64 ),
    65 ),
    66 },
    67)
    68def recipe_suggestions():
    69 @task
    70 def get_countries_list(**context):
    71 "Pull the list of countries from the context."
    72 countries = context["params"]["countries"]
    73 return countries
    74
    75 @task
    76 def get_ingredients_list(**context):
    77 "Pull the list of ingredients from the context."
    78 ingredients = context["params"]["pantry_ingredients"]
    79 return ingredients
    80
    81 @task
    82 def get_a_recipe(
    83 cohere_conn_id: str, country: str, ingredients_list: list, **context
    84 ):
    85 "Get recipes from the Cohere API for your pantry ingredients for a given country."
    86 type = context["params"]["type"]
    87 max_tokens_answer = context["params"]["max_tokens_recipe"]
    88 randomness_of_answer = context["params"]["randomness_of_recipe"]
    89 co = CohereHook(conn_id=cohere_conn_id).get_conn
    90
    91 response = co.generate(
    92 model="command",
    93 prompt=f"Please provide a delicious {type} recipe from {country} "
    94 + f"that uses as many of these ingredients: {', '.join(ingredients_list)} as possible, "
    95 + "if you can't find a recipe that uses all of them, suggest an additional desert."
    96 + "Bonus points if it's a traditional recipe from that country, "
    97 + "you can name the city or region it's from and you can provide "
    98 + "vegan alternatives for the ingredients."
    99 + "Provide the full recipe with all steps and ingredients.",
    100 max_tokens=max_tokens_answer,
    101 temperature=randomness_of_answer / 10,
    102 )
    103
    104 recipe = response.generations[0].text
    105
    106 print(f"Your recipe from {country}")
    107 print(f"for the ingredients {', '.join(ingredients_list)} is:")
    108 print(recipe)
    109
    110 with open(f"include/{country}_recipe.txt", "w") as f:
    111 f.write(recipe)
    112
    113 return recipe
    114
    115 countries_list = get_countries_list()
    116 ingredients_list = get_ingredients_list()
    117 recipes_list = get_a_recipe.partial(
    118 cohere_conn_id=COHERE_CONN_ID, ingredients_list=ingredients_list
    119 ).expand(country=countries_list)
    120
    121 get_embeddings = CohereEmbeddingOperator.partial(
    122 task_id="get_embeddings",
    123 conn_id=COHERE_CONN_ID,
    124 ).expand(input_text=recipes_list)
    125
    126 @task
    127 def plot_embeddings(embeddings, text_labels, file_name="embeddings_plot.png"):
    128 "Plot the embeddings of the recipes."
    129
    130 embeddings = [x[0] for x in embeddings]
    131 print(text_labels)
    132
    133 pca = PCA(n_components=2)
    134 reduced_embeddings = pca.fit_transform(embeddings)
    135
    136 plt.figure(figsize=(10, 8))
    137 df_embeddings = pd.DataFrame(reduced_embeddings, columns=["PC1", "PC2"])
    138 sns.scatterplot(
    139 df_embeddings, x="PC1", y="PC2", s=100, color="gold", edgecolor="black"
    140 )
    141
    142 font_style = {"color": "black"}
    143 texts = []
    144 for i, label in enumerate(text_labels):
    145 texts.append(
    146 plt.text(
    147 reduced_embeddings[i, 0],
    148 reduced_embeddings[i, 1],
    149 label,
    150 fontdict=font_style,
    151 fontsize=15,
    152 )
    153 )
    154
    155 # prevent overlapping labels
    156 adjust_text(texts, arrowprops=dict(arrowstyle="->", color="red"))
    157
    158 distances = euclidean_distances(reduced_embeddings)
    159 np.fill_diagonal(distances, np.inf) # exclude cases where the distance is 0
    160
    161 n = distances.shape[0]
    162 distances_list = [
    163 (distances[i, j], (i, j)) for i in range(n) for j in range(i + 1, n)
    164 ]
    165
    166 distances_list.sort(reverse=True)
    167
    168 legend_handles = []
    169 for dist, (i, j) in distances_list:
    170 (line,) = plt.plot(
    171 [reduced_embeddings[i, 0], reduced_embeddings[j, 0]],
    172 [reduced_embeddings[i, 1], reduced_embeddings[j, 1]],
    173 "gray",
    174 linestyle="--",
    175 alpha=0.3,
    176 )
    177 legend_handles.append(line)
    178
    179 legend_labels = [
    180 f"{text_labels[i]} - {text_labels[j]}: {dist:.2f}"
    181 for dist, (i, j) in distances_list
    182 ]
    183
    184 for i in range(len(reduced_embeddings)):
    185 for j in range(i + 1, len(reduced_embeddings)):
    186 plt.plot(
    187 [reduced_embeddings[i, 0], reduced_embeddings[j, 0]],
    188 [reduced_embeddings[i, 1], reduced_embeddings[j, 1]],
    189 "gray",
    190 linestyle="--",
    191 alpha=0.5,
    192 )
    193
    194 plt.legend(
    195 legend_handles,
    196 legend_labels,
    197 title="Distances",
    198 loc="center left",
    199 bbox_to_anchor=(1, 0.5),
    200 )
    201
    202 plt.tight_layout()
    203 plt.title(
    204 "2D Visualization of recipe similarities", fontsize=16, fontweight="bold"
    205 )
    206 plt.xlabel("PCA Component 1", fontdict=font_style)
    207 plt.ylabel("PCA Component 2", fontdict=font_style)
    208
    209 plt.savefig(file_name, bbox_inches="tight")
    210 plt.close()
    211
    212 chain(
    213 get_embeddings,
    214 plot_embeddings(
    215 get_embeddings.output,
    216 text_labels=countries_list,
    217 file_name=IMAGE_PATH,
    218 ),
    219 )
    220
    221
    222recipe_suggestions()

    This DAG consists of five tasks to make a simple MLOps pipeline.

    • The get_ingredients task fetches the list of ingredients that the user found in their pantry and wants to use in their recipe. The input pantry_ingredients param is provided by Airflow params when you run the DAG.
    • The get_countries task uses Airflow params to retrieve the list of user-provided countries to get recipes from.
    • The get_a_recipe task uses the CohereHook to connect to the Cohere API and use the /generate endpoint to get a tasty recipe suggestion based on the user’s pantry ingredients and one of the countries they provided. This task is dynamically mapped over the list of countries to generate one task instance per country. The recipes are saved as .txt files in the include folder.
    • The get_embeddings task is defined using the CohereEmbeddingOperator to generate vector embeddings of the recipes generated by the upstream get_a_recipe task. This task is dynamically mapped over the list of recipes to retrieve one set of embeddings per recipe. This pattern allows for efficient parallelization of the vector embedding generation.
    • The plot_embeddings task takes the embeddings created by the upstream task and performs dimensionality reduction using PCA to plot the embeddings in two dimensions.

    Screenshot of the Airflow UI showing the successful completion of the recipe_suggestions DAG in the Grid view with the Graph tab selected. 6 countries were provided to get recipes suggestions from, which led to 8 mapped task instances of both the get_a_recipe and get_embeddings task.

Step 3: Run your DAG

  1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the recipe_suggestions DAG by clicking the play button. Then, provide Airflow params for:

    • Countries of recipe origin: A list of the countries you want to get recipe suggestions from. Make sure to create one line per country and to provide at least two countries.
    • pantry_ingredients: A list of the ingredients you have in your pantry and want to use in the recipe. Make sure to create one line per ingredient.
    • type: Select your preferred recipe type.
    • max_tokens_recipe: The maximum number of tokens available for the recipe.
    • randomness_of_recipe: The randomness of the recipe. The value provided is divided by 10 and given to the temperature parameter of the of the Cohere API. The scale for the param ranges from 0 to 50, with 0 being the most deterministic and 50 being the most random.

    Screenshot of the Airflow UI showing the params available for the recipe_suggestions DAG with the default choices.

  3. Go to the include folder to view the image file created by the plot_embeddings task. The image should look similar to the one below.

    Screenshot of the image created by the plot_embeddings task showing the two dimensional representation of the closeness of recipes associated with different countries.

Step 4: (Optional) Cook your recipe

  1. Choose one of the recipes in the include folder.
  2. Navigate to your kitchen and cook the recipe you generated using Cohere with Airflow.
  3. Enjoy!

Conclusion

Congratulations! You used Airflow and Cohere to get recipe suggestions based on your pantry items. You can now use Airflow to orchestrate Cohere operations in your own machine learning pipelines.