Use Airflow object storage to interact with cloud storage in an ML pipeline

Airflow 2.8 introduced the Airflow object storage feature to simplify how you interact with remote and local object storage systems.

This tutorial demonstrates the object storage feature using a simple machine learning pipeline. The pipeline trains a classifier to predict whether a sentence is more likely to have been said by Star Trek’s Captain Kirk or Captain Picard.

Object storage is currently considered experimental and might be subject to breaking changes in future releases. For more information see AIP-58.

Why use Airflow object storage?

Object stores are ubiquitous in modern data pipelines. They are used to store raw data, model-artifacts, image, video, text and audio files, and more. Because each object storage system has different file naming and path conventions, it can be challenging to work with data across many different object stores.

Airflow’s object storage feature allow you to:

  • Abstract your interactions with object stores using a Path API. Note that some limitations apply due to the nature of different remote object storage systems. See Cloud Object Stores are not real file systems.
  • Switch between different object storage systems without having to change your DAG code.
  • Transfer files between different object storage systems without needing to use XToYTransferOperator operators.
  • Transfer large files efficiently. For object storage, Airflow uses shutil.copyfileobj() to stream files in chunks instead of loading them into memory in their entirety.

Time to complete

This tutorial takes approximately 20 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

Step 1: Configure your Astro project

  1. Create a new Astro project:

    1$ mkdir astro-object-storage-tutorial && cd astro-object-storage-tutorial
    2$ astro dev init
  2. Add the following lines to your Astro project requirements.txt file to install the Amazon provider with the s3fs extra, as well as the scikit-learn package. If you are using Google Cloud Storage or Azure Blob Storage, install the Google provider or Azure provider instead.

    apache-airflow-providers-amazon[s3fs]==8.13.0
    scikit-learn==1.3.2
  3. To create an Airflow connection to AWS S3, add the following environment variable to your .env file. Make sure to replace <your-aws-access-key-id> and <your-aws-secret-access-key> with your own AWS credentials. Adjust the connection type and parameters if you are using a different object storage system.

    AIRFLOW_CONN_MY_AWS_CONN='{
    "conn_type": "aws",
    "login": "<your-aws-access-key-id>",
    "password": <your-aws-secret-access-key>",
    }'

Step 2: Prepare your data

In this example pipeline you will train a classifier to predict whether a sentence is more likely to have been said by Captain Kirk or Captain Picard. The training set consists of 3 quotes from each captain stored in .txt files.

  1. Create a new bucket in your S3 account called astro-object-storage-tutorial.
  2. In the bucket, create a folder called ingest with two subfolders kirk_quotes and picard_quotes.
  3. Upload the files from Astronomer’s GitHub repository into the respective folders.

Step 3: Create your DAG

  1. In your dags folder, create a file called object_storage_use_case.py.

  2. Copy the following code into the file.

    1"""
    2## Move files between object storage system locations in an MLOps pipeline
    3
    4This DAG shows the basic use of the Airflow 2.8 Object Storage feature to
    5copy files between object storage system locations in an MLOps pipeline training
    6a Naive Bayes Classifier to distinguish between quotes from Captain Kirk and
    7Captain Picard and provide a prediction for a user-supplied quote.
    8
    9To be able to run this DAG you will need to add the contents of
    10`include/ingestion_data_object_store_use_case` to your object storage system,
    11install the relevant provider package for your object storage and define an
    12Airflow connection to it.
    13If you do not want to use remote storage you can use `file://` for local object
    14storage and adjust the paths accordingly.
    15"""
    16
    17from airflow.decorators import dag, task
    18from pendulum import datetime
    19from airflow.io.path import ObjectStoragePath
    20from airflow.models.baseoperator import chain
    21from airflow.models.param import Param
    22import joblib
    23import base64
    24import io
    25
    26OBJECT_STORAGE_INGEST = "s3"
    27CONN_ID_INGEST = "my_aws_conn"
    28PATH_INGEST = "astro-object-storage-tutorial/ingest/"
    29
    30OBJECT_STORAGE_TRAIN = "s3"
    31CONN_ID_TRAIN = "my_aws_conn"
    32PATH_TRAIN = "astro-object-storage-tutorial/train/"
    33
    34OBJECT_STORAGE_ARCHIVE = "file"
    35CONN_ID_ARCHIVE = None
    36PATH_ARCHIVE = "include/archive/"
    37
    38
    39base_path_ingest = ObjectStoragePath(
    40 f"{OBJECT_STORAGE_INGEST}://{PATH_INGEST}", conn_id=CONN_ID_INGEST
    41)
    42
    43base_path_train = ObjectStoragePath(
    44 f"{OBJECT_STORAGE_TRAIN}://{PATH_TRAIN}", conn_id=CONN_ID_TRAIN
    45)
    46
    47base_path_archive = ObjectStoragePath(
    48 f"{OBJECT_STORAGE_ARCHIVE}://{PATH_ARCHIVE}", conn_id=CONN_ID_ARCHIVE
    49)
    50
    51
    52@dag(
    53 start_date=datetime(2023, 12, 1),
    54 schedule=None,
    55 catchup=False,
    56 tags=["ObjectStorage"],
    57 doc_md=__doc__,
    58 params={
    59 "my_quote": Param(
    60 "Time and space are creations of the human mind.",
    61 type="string",
    62 description="Enter a quote to be classified as Kirk-y or Picard-y.",
    63 )
    64 },
    65)
    66def object_storage_use_case():
    67 @task
    68 def list_files_ingest(base: ObjectStoragePath) -> list[ObjectStoragePath]:
    69 """List files in remote object storage including subdirectories."""
    70
    71 labels = [obj for obj in base.iterdir() if obj.is_dir()]
    72 files = [f for label in labels for f in label.iterdir() if f.is_file()]
    73 return files
    74
    75 @task
    76 def copy_files_ingest_to_train(src: ObjectStoragePath, dst: ObjectStoragePath):
    77 """Copy a file from one remote system to another.
    78 The file is streamed in chunks using shutil.copyobj"""
    79
    80 src.copy(dst=dst)
    81
    82 @task
    83 def list_files_train(base: ObjectStoragePath) -> list[ObjectStoragePath]:
    84 """List files in remote object storage."""
    85
    86 files = [f for f in base.iterdir() if f.is_file()]
    87 return files
    88
    89 @task
    90 def get_text_from_file(file: ObjectStoragePath) -> dict:
    91 """Read files in remote object storage."""
    92
    93 bytes = file.read_block(offset=0, length=None)
    94 text = bytes.decode("utf-8")
    95
    96 key = file.key
    97 filename = key.split("/")[-1]
    98 label = filename.split("_")[-2]
    99 return {"label": label, "text": text}
    100
    101 @task
    102 def train_model(train_data: list[dict]):
    103 """Train a Naive Bayes Classifier using the files in the train folder."""
    104
    105 from sklearn.feature_extraction.text import CountVectorizer
    106 from sklearn.naive_bayes import MultinomialNB
    107 from sklearn.pipeline import make_pipeline
    108 from sklearn.model_selection import train_test_split
    109
    110 text_data = [d["text"] for d in train_data]
    111 labels = [d["label"] for d in train_data]
    112
    113 X_train, X_test, y_train, y_test = train_test_split(
    114 text_data, labels, test_size=0.2, random_state=42
    115 )
    116
    117 model = make_pipeline(CountVectorizer(), MultinomialNB())
    118
    119 model.fit(X_train, y_train)
    120
    121 buffer = io.BytesIO()
    122 joblib.dump(model, buffer)
    123 buffer.seek(0)
    124
    125 encoded_model = base64.b64encode(buffer.getvalue()).decode("utf-8")
    126
    127 return encoded_model
    128
    129 @task
    130 def use_model(encoded_model: str, **context):
    131 """Load the model and use it for prediction."""
    132 my_quote = context["params"]["my_quote"]
    133
    134 model_binary = base64.b64decode(encoded_model)
    135
    136 buffer = io.BytesIO(model_binary)
    137 model = joblib.load(buffer)
    138
    139 predictions = model.predict([my_quote])
    140
    141 print(f"The quote: '{my_quote}'")
    142 print(f"sounds like it could have been said by {predictions[0].capitalize()}")
    143
    144 @task
    145 def copy_files_train_to_archive(src: ObjectStoragePath, dst: ObjectStoragePath):
    146 """Copy a file from a remote system to local storage."""
    147
    148 src.copy(dst=dst)
    149
    150 @task
    151 def empty_train(base: ObjectStoragePath):
    152 """Empty the train folder."""
    153
    154 for file in base.iterdir():
    155 file.unlink()
    156
    157 files_ingest = list_files_ingest(base=base_path_ingest)
    158 files_copied = copy_files_ingest_to_train.partial(dst=base_path_train).expand(
    159 src=files_ingest
    160 )
    161 files_train = list_files_train(base=base_path_train)
    162 chain(files_copied, files_train)
    163 train_data = get_text_from_file.expand(file=files_train)
    164 encoded_model = train_model(train_data=train_data)
    165 use_model(encoded_model=encoded_model)
    166 chain(
    167 encoded_model,
    168 copy_files_train_to_archive.partial(dst=base_path_archive).expand(
    169 src=files_train
    170 ),
    171 empty_train(base=base_path_train),
    172 )
    173
    174
    175object_storage_use_case()

    This DAG uses three different object storage locations, which can be aimed at different object storage systems by changing the OBJECT_STORAGE_X, PATH_X and CONN_ID_X for each location.

    • base_path_ingest: The base path for the ingestion data. This is the path to the training quotes you uploaded in Step 2.
    • base_path_train: The base path for the training data, this is the location from which data for training the model will be read.
    • base_path_archive: The base path for the archive location where data that has previously been used for training will be moved to.

    The DAG consists of eight tasks to make a simple MLOps pipeline.

    • The list_files_ingest task takes the base_path_ingest as an input and iterates through the subfolders kirk_quotes and picard_quotes to return all files in the folders as individual ObjectStoragePath objects. Using the object storage feature enables you to use the .iterdir(), .is_dir() and .is_file() methods to list and evaluate object storage contents no matter which object storage system they are stored in.
    • The copy_files_ingest_to_train task is dynamically mapped over the list of files returned by the list_files_ingest task. It takes the base_path_train as an input and copies the files from the base_path_ingest to the base_path_train location, providing an example of transferring files between different object storage systems using the .copy() method of the ObjectStoragePath object. Under the hood, this method uses shutil.copyfileobj() to stream files in chunks instead of loading them into memory in their entirety.
    • The list_files_train task lists all files in the base_path_train location.
    • The get_text_from_file task is dynamically mapped over the list of files returned by the list_files_train task to read the text from each file using the .read_blocks() method of the ObjectStoragePath object. Using the object storage feature enables you to switch the object storage system, for example to Azure Blob storage, without needing to change the code. The file name provides the label for the text and both, label and full quote are returned as a dictionary to be passed via XCom to the next task.
    • The train_model task trains a Naive Bayes classifier on the data returned by the get_text_from_file task. The fitted model is serialized as a base64 encoded string and passed via XCom to the next task.
    • The use_model task deserializes the trained model to run a prediction on a user-provided quote, determining whether the quote is more likely to have been said by Captain Kirk or Captain Picard. The prediction is printed to the logs.
    • The copy_files_train_to_archive task copies the files from the base_path_train to the base_path_archive location analogous to the copy_files_ingest_to_train task.
    • The empty_train task deletes all files from the base_path_train location.

    Screenshot of the Airflow UI showing the successful completion of the object_storage_use_case DAG in the Grid view with the Graph tab selected.

Step 4: Run your DAG

  1. Run astro dev start in your Astro project to start Airflow, then open the Airflow UI at localhost:8080.

  2. In the Airflow UI, run the object_storage_use_case DAG by clicking the play button. Provide any quote you like to the my_quote Airflow param.

  3. After the DAG run completes, go to the task logs of the use_model task to see the prediction made by the model.

    [2023-12-11, 00:19:22 UTC] {logging_mixin.py:188} INFO - The quote: 'Time and space are creations of the human mind.'
    [2023-12-11, 00:19:22 UTC] {logging_mixin.py:188} INFO - sounds like it could have been said by Picard

Conclusion

Congratulations! You just used Airflow’s object storage feature to interact with files in different locations. To learn more about other methods and capabilities of this feature, see the OSS Airflow documentation.