Orchestrate Ray jobs on Anyscale with Apache Airflow®

Anyscale is a compute platform for AI/ML workloads built on the open-source Ray framework, providing the layer for parallel processing and distributed computing. The Anyscale provider package for Apache Airflow® allows you to interact with Anyscale from your Airflow DAGs. This tutorial shows a simple example of how to use the Anyscale provider package to orchestrate Ray jobs on Anyscale with Airflow. For more in-depth information, see the Anyscale provider documentation.

For instructions on how to run open-source Ray jobs with Airflow, see the Orchestrate Ray jobs with Apache Airflow® tutorial.

This tutorial shows a simple implementation of the Anyscale provider package. For a more complex example, see the Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale reference architecture.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

Step 1: Configure your Astro project

Use the Astro CLI to create and run an Airflow project on your local machine.

  1. Create a new Astro project:

    1$ mkdir astro-anyscale-tutorial && cd astro-anyscale-tutorial
    2$ astro dev init
  2. In the requirements.txt file, add the Anyscale provider.

    astro-provider-anyscale==1.0.1
  3. Run the following command to start your Airflow project:

    1astro dev start

Step 2: Configure a Ray connection

For Astro customers, Astronomer recommends to take advantage of the Astro Environment Manager to store connections in an Astro-managed secrets backend. These connections can be shared across multiple deployed and local Airflow environments. See Manage Astro connections in branch-based deploy workflows.

  1. In the Airflow UI, go to Admin -> Connections and click +.

  2. Create a new connection and choose the Anyscale connection type. Enter the following information:

  3. Click Save.

Step 3: Write a DAG to orchestrate Anyscale jobs

  1. Create a new file in your dags directory called anyscale_script.py and add the following code:

    1# anyscale_script.py
    2@ray.remote
    3def square(x):
    4 return x**2
    5
    6
    7def main(data):
    8 ray.init()
    9 data = np.array(data)
    10 futures = [square.remote(x) for x in data]
    11 results = ray.get(futures)
    12 mean = np.mean(results)
    13 print(f"Mean squared value: {mean}")
    14 return mean
    15
    16
    17if __name__ == "__main__":
    18 parser = argparse.ArgumentParser(description="Process some integers.")
    19 parser.add_argument(
    20 "data", nargs="+", type=float, help="List of numbers to process"
    21 )
    22 args = parser.parse_args()
    23 data = args.data
    24 main(data)
  2. Create a new file in your dags directory called anyscale_tutorial.py.

  3. Copy and paste the code below into the file:

    1"""
    2## Anyscale Tutorial
    3
    4This tutorial demonstrates how to use the Anyscale provider in Airflow to
    5parallelize a task using Ray on Anyscale.
    6"""
    7
    8from airflow.decorators import dag
    9from airflow.operators.python import PythonOperator
    10from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
    11from airflow.models.baseoperator import chain
    12from pathlib import Path
    13
    14CONN_ID = "anyscale_conn"
    15FOLDER_PATH = Path(__file__).parent
    16
    17
    18def _generate_data() -> list:
    19 """
    20 Generate sample data
    21 Returns:
    22 list: List of integers
    23 """
    24 import random
    25
    26 return [random.randint(1, 100) for _ in range(10)]
    27
    28
    29@dag(
    30 start_date=None,
    31 schedule=None,
    32 catchup=False,
    33 tags=["ray", "example"],
    34 doc_md=__doc__,
    35)
    36def anyscale_tutorial():
    37
    38 data = PythonOperator(
    39 task_id="generate_data",
    40 python_callable=_generate_data,
    41 )
    42
    43 get_mean_squared_value = SubmitAnyscaleJob(
    44 task_id="SubmitRayJob",
    45 conn_id=CONN_ID,
    46 name="AstroJob",
    47 image_uri="< your image uri >", # e.g. "anyscale/ray:2.35.0-slim-py312-cpu"
    48 compute_config="< your compute config >", # e.g. airflow-integration-testing:1
    49 entrypoint="python anyscale_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}",
    50 working_dir=str(FOLDER_PATH), # the folder containing the script
    51 requirements=["requests", "pandas", "numpy", "torch"],
    52 max_retries=1,
    53 job_timeout_seconds=3000,
    54 poll_interval=30,
    55 )
    56
    57 chain(data, get_mean_squared_value)
    58
    59
    60anyscale_tutorial()
    • The generate_data task randomly generates a list of 10 integers.
    • The get_mean_squared_value task submits a Ray job on Anyscale to calculate the mean squared value of the list of integers.

Step 4: Run the DAG

  1. In the Airflow UI, click the play button to manually run your DAG.

  2. After the DAG runs successfully, check your Anyscale account to see the job submitted by Airflow.

    Anyscale showing a Job completed successfully.

Conclusion

Congratulations! You’ve run a Ray job on Anyscale using Apache Airflow. You can now use the Anyscale provider package to orchestrate more complex jobs, see Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale for an example.