Orchestrate OpenSearch operations with Apache Airflow

OpenSearch is an open source distributed search and analytics engine based on Apache Lucene. It offers advanced search capabilities on large bodies of text alongside powerful machine learning plugins. The OpenSearch Airflow provider offers modules to easily integrate OpenSearch with Airflow.

In this tutorial you’ll use Airflow to create an index in OpenSearch, ingest the lyrics of the musical Hamilton into the index, and run a search query on the index to see which character most often sings a specific word.

Why use Airflow with OpenSearch?

OpenSearch allows you to perform complex search queries on indexed text documents. Additionally, the tool comes with a variety of plugins for use cases such as security analytics, semantic search, and neural search.

Integrating OpenSearch with Airflow allows you to:

  • Use Airflow’s data-driven scheduling to run operations involving documents stored in OpenSearch based on upstream events in your data ecosystem, such as when a new model is trained or a new dataset is available.
  • Run dynamic queries based on upstream events in your data ecosystem or user input via Airflow params on documents and vectors stored in OpenSearch to retrieve relevant objects.
  • Add Airflow features like retries and alerts to your OpenSearch operations.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

This tutorial uses a local OpenSearch instance created as a Docker container. You do not need to install OpenSearch on your machine.

The example code from this tutorial is also available on GitHub.

Step 1: Configure your Astro project

  1. Create a new Astro project:

    1$ mkdir astro-opensearch-tutorial && cd astro-opensearch-tutorial
    2$ astro dev init
  2. Add the following two lines to your Astro project requirements.txt file to install the OpenSearch Airflow provider and the pandas package in your Astro project:

    apache-airflow-providers-opensearch==1.0.0
    pandas==1.5.3
  3. This tutorial uses a local OpenSearch instance running in a Docker container. To run an OpenSearch container as part of your Airflow environment, create a new file in your Astro project root directory called docker-compose.override.yml and copy and paste the following into it:

    1version: '3.1'
    2services:
    3 opensearch:
    4 image: opensearchproject/opensearch:2
    5 ports:
    6 - "9200:9200" # OpenSearch REST API
    7 - "9300:9300" # OpenSearch Node-to-Node communication
    8 environment:
    9 - discovery.type=single-node
    10 - plugins.security.ssl.http.enabled=false
    11 volumes:
    12 - opensearch-data:/usr/share/opensearch/data
    13 networks:
    14 - airflow
    15# Airflow containers
    16 scheduler:
    17 networks:
    18 - airflow
    19 webserver:
    20 networks:
    21 - airflow
    22 triggerer:
    23 networks:
    24 - airflow
    25 postgres:
    26 networks:
    27 - airflow
    28
    29# volume for OpenSearch
    30volumes:
    31 opensearch-data:
  4. Add the following configuration to your .env file to create an Airflow connection between Airflow and your OpenSearch instance. If you already have a cloud-based OpenSearch instance, you can connect to that instead of the local instance by adjusting the values in the connection.

    AIRFLOW_CONN_OPENSEARCH_DEFAULT='{
    "conn_type": "opensearch",
    "host": "opensearch",
    "port": 9200,
    "login": "admin",
    "password": "admin"
    }'

Step 2: Add your data

The DAG in this tutorial uses a Kaggle dataset that contains the lyrics of the musical Hamilton.

  1. Download the hamilton_lyrics.csv from Astronomer’s GitHub.

  2. Save the file in your Astro project include folder.

Step 3: Create your DAG

  1. In your dags folder, create a file called search_hamilton.py.

  2. Copy the following code into the file.

    1"""
    2## Use the OpenSearch provider to ingest and search Hamilton lyrics
    3
    4This DAG uses the OpenSearch provider to create an index in OpenSearch,
    5ingest Hamilton lyrics into the index, and search for which character and which song
    6mention a keyword the most.
    7"""
    8
    9from airflow.decorators import dag, task
    10from airflow.models.baseoperator import chain
    11from airflow.operators.empty import EmptyOperator
    12from airflow.providers.opensearch.operators.opensearch import (
    13 OpenSearchAddDocumentOperator,
    14 OpenSearchCreateIndexOperator,
    15 OpenSearchQueryOperator,
    16)
    17from airflow.providers.opensearch.hooks.opensearch import OpenSearchHook
    18from pendulum import datetime
    19import csv
    20import uuid
    21import pandas as pd
    22
    23OPENSEARCH_INDEX_NAME = "hamilton_lyrics"
    24OPENSEARCH_CONN_ID = "opensearch_default"
    25LYRICS_CSV_PATH = "include/hamilton_lyrics.csv"
    26KEYWORD_TO_SEARCH = "write"
    27
    28
    29@dag(
    30 start_date=datetime(2023, 10, 18),
    31 schedule=None,
    32 catchup=False,
    33)
    34def search_hamilton():
    35 @task.branch
    36 def check_if_index_exists(index_name: str, conn_id: str) -> str:
    37 client = OpenSearchHook(open_search_conn_id=conn_id, log_query=True).client
    38 is_index_exist = client.indices.exists(index_name)
    39 if is_index_exist:
    40 return "index_exists"
    41 return "create_index"
    42
    43 create_index = OpenSearchCreateIndexOperator(
    44 task_id="create_index",
    45 opensearch_conn_id=OPENSEARCH_CONN_ID,
    46 index_name=OPENSEARCH_INDEX_NAME,
    47 index_body={
    48 "settings": {"index": {"number_of_shards": 1}},
    49 "mappings": {
    50 "properties": {
    51 "title": {"type": "keyword"},
    52 "speaker": {
    53 "type": "keyword",
    54 },
    55 "lines": {"type": "text"},
    56 }
    57 },
    58 },
    59 )
    60
    61 index_exists = EmptyOperator(task_id="index_exists")
    62
    63 @task
    64 def csv_to_dict_list(csv_file_path: str) -> list:
    65 with open(csv_file_path, mode="r", encoding="utf-8") as file:
    66 reader = csv.DictReader(file)
    67 list_of_hamilton_lines = list(reader)
    68
    69 list_of_kwargs = []
    70 for line in list_of_hamilton_lines:
    71 unique_line_id = uuid.uuid5(
    72 name=" ".join([line["title"], line["speaker"], line["lines"]]),
    73 namespace=uuid.NAMESPACE_DNS,
    74 )
    75 kwargs = {"doc_id": str(unique_line_id), "document": line}
    76
    77 list_of_kwargs.append(kwargs)
    78
    79 return list_of_kwargs
    80
    81 list_of_document_kwargs = csv_to_dict_list(csv_file_path=LYRICS_CSV_PATH)
    82
    83 add_lines_as_documents = OpenSearchAddDocumentOperator.partial(
    84 task_id="add_lines_as_documents",
    85 opensearch_conn_id=OPENSEARCH_CONN_ID,
    86 trigger_rule="none_failed",
    87 index_name=OPENSEARCH_INDEX_NAME,
    88 ).expand_kwargs(list_of_document_kwargs)
    89
    90 search_for_keyword = OpenSearchQueryOperator(
    91 task_id=f"search_for_{KEYWORD_TO_SEARCH}",
    92 opensearch_conn_id=OPENSEARCH_CONN_ID,
    93 index_name=OPENSEARCH_INDEX_NAME,
    94 query={
    95 "size": 0,
    96 "query": {
    97 "match": {"lines": {"query": KEYWORD_TO_SEARCH, "fuzziness": "AUTO"}}
    98 },
    99 "aggs": {
    100 "most_mentions_person": {"terms": {"field": "speaker"}},
    101 "most_mentions_song": {"terms": {"field": "title"}},
    102 },
    103 },
    104 )
    105
    106 @task
    107 def print_query_result(query_result: dict, keyword: str) -> None:
    108 results_most_mentions_person = query_result["aggregations"][
    109 "most_mentions_person"
    110 ]["buckets"]
    111 results_most_mentions_song = query_result["aggregations"]["most_mentions_song"][
    112 "buckets"
    113 ]
    114
    115 df_person = pd.DataFrame(results_most_mentions_person)
    116 df_person.columns = ["Character", f"Number of lines that include '{keyword}'"]
    117 df_song = pd.DataFrame(results_most_mentions_song)
    118 df_song.columns = ["Song", f"Number of lines that include '{keyword}'"]
    119
    120 print(
    121 f"\n Top 3 Hamilton characters that mention '{keyword}' the most:\n ",
    122 df_person.head(3).to_string(index=False),
    123 )
    124 print(
    125 f"\n Top 3 Hamilton songs that mention '{keyword}' the most:\n ",
    126 df_song.head(3).to_string(index=False),
    127 )
    128
    129 chain(
    130 check_if_index_exists(
    131 index_name=OPENSEARCH_INDEX_NAME, conn_id=OPENSEARCH_CONN_ID
    132 ),
    133 [create_index, index_exists],
    134 add_lines_as_documents,
    135 )
    136 chain(
    137 list_of_document_kwargs,
    138 add_lines_as_documents,
    139 search_for_keyword,
    140 print_query_result(
    141 query_result=search_for_keyword.output,
    142 keyword=KEYWORD_TO_SEARCH,
    143 ),
    144 )
    145
    146
    147search_hamilton()

This DAG consists of seven tasks to make a simple ML orchestration pipeline.

  • The check_if_index_exists task uses a @task.branch decorator to check if the index OPENSEARCH_INDEX_NAME exists in your OpenSearch instance. If it does not exist, the task returns the string create_index causing the downstream create_index task to run. If the index exists, the task causes the empty index_exists task to run instead.
  • The create_index task defined with the OpenSearchCreateIndexOperator creates the index OPENSEARCH_INDEX_NAME in your OpenSearch instance with the three properties title, speaker and lines.
  • The csv_to_dict_list task uses the @task decorator to ingest the lyrics of the musical Hamilton from the hamilton_lyrics.csv file into a list of Python dictionaries. Each dictionary represents a line of the musical and will be one document in the OpenSearch index.
  • The add_lines_as_documents task is a dynamically mapped task using the OpenSearchAddDocumentOperator to create one mapped task instance for each document to ingest.
  • The search_for_keyword task is defined with the OpenSearchQueryOperator and performs a fuzzy query on the OpenSearch index to find the character and song that mention the KEYWORD_TO_SEARCH the most.
  • The print_query_result prints the query results to the task logs.

Screenshot of the Airflow UI showing the successful completion of the search_hamilton DAG in the Grid view with the Graph tab selected.

For information on more advanced search techniques in OpenSearch, see the OpenSearch documentation.

Step 4: Run your DAG

  1. Run astro dev start in your Astro project to start Airflow and open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the search_hamilton DAG by clicking the play button. By default the DAG will search the lyrics for the word write, but you can change the search term by updating the KEYWORD_TO_SEARCH variable in your DAG file.

  3. View your song results in the task logs of the print_query_result task:

    [2023-11-22, 14:01:58 UTC] {logging_mixin.py:154} INFO -
    Top 3 Hamilton characters that mention 'write' the most:
    Character Number of lines that include 'write'
    HAMILTON 15
    ELIZA 8
    BURR 4
    [2023-11-22, 14:01:58 UTC] {logging_mixin.py:154} INFO -
    Top 3 Hamilton songs that mention 'write' the most:
    Song Number of lines that include 'write'
    Non-Stop 11
    Hurricane 10
    Burn 3
  4. (Optional) Listen to the song that mentions your keyword the most.

Conclusion

Congratulations! You used Airflow and OpenSearch to analyze the lyrics of Hamilton! You can now use Airflow to orchestrate OpenSearch operations in your own machine learning pipelines. History has its eyes on you.