Snowpark is the set of runtimes and libraries that securely deploy and process Python and other programming code in Snowflake. This includes Snowpark ML, the Python library and underlying infrastructure for end-to-end ML workflows in Snowflake. Snowpark ML has 2 components: Snowpark ML Modeling for model development, and Snowpark ML Operations including the Snowpark Model Registry, for model deployment and management.
In this tutorial, you’ll learn how to:
The provider used in this tutorial is currently in beta and both its contents and decorators are subject to change. After the official release, this tutorial will be updated.

Snowpark allows you to use Python to perform transformations and machine learning operations on data stored in Snowflake.
Integrating Snowpark for Python with Airflow offers the benefits of:
The Snowpark provider for Airflow simplifies interacting with Snowpark by:
Additionally, this tutorial shows how to use Snowflake as a custom XCom backend. This is especially useful for organizations with strict compliance requirements who want to keep all their data in Snowflake, but still leverage Airflow XCom to pass data between tasks.
This tutorial takes approximately 45 minutes to complete.
To get the most out of this tutorial, make sure you have an understanding of:
The Astro CLI.
A Snowflake account. A 30-day free trial is available. You need to have at least one database and one schema created to store the data and models used in this tutorial.
(Optional) This tutorial includes instructions on how to use the Snowflake custom XCom backend included in the provider. If you want to this custom XCom backend you will need to either:
Run the DAG using a Snowflake account with ACCOUNTADMIN privileges to allow the DAG’s first task to create the required database, schema, stage and table. See Step 3.3 for more instructions. The free trial account has the required privileges.
Ask your Snowflake administrator to:
AIRFLOW__CORE__XCOM_SNOWFLAKE_TABLE and AIRFLOW__CORE__XCOM_SNOWFLAKE_STAGE environment variables.XCOM_TABLE with the following schema:The example code from this tutorial is also available on GitHub.
Create a new Astro project:
Create a new file in your Astro project’s root directory called requirements-snowpark.txt. This file contains all Python packages that you install in your reusable Snowpark environment.
Change the content of the Dockerfile of your Astro project to the following, which creates a virtual environment by using the Astro venv buildkit. The requirements added in the previous step are installed in that virtual environment. This tutorial includes Snowpark Python tasks that are running in virtual environments, which is a common pattern in production to simplify dependency management. This Dockerfile creates a virtual environment called snowpark with the Python version 3.8 and the packages specified in requirements-snowpark.txt.
Add the following package to your packages.txt file:
Add the following packages to your requirements.txt file. The Astro Snowflake provider is installed from the whl file.
The Astro Snowflake provider is currently in beta. Classes from this provider might be subject to change and will be included in the Snowflake provider in a future release.
To create an Airflow connection to Snowflake and allow serialization of Astro Python SDK objects, add the following to your .env file. Make sure to enter your own Snowflake credentials as well as the name of an existing database and schema.
For more information on creating a Snowflake connection, see Create a Snowflake connection in Airflow.
(Optional) If you want to use a Snowflake custom XCom backend, add the following additional variables to your .env. Replace the values with the name of your own database, schema, table, and stage if you are not using the suggested values.
The DAG in this tutorial runs a classification model on synthetic data to predict which afternoon beverage a skier will choose based on attributes like ski color, ski resort, and amount of new snow. The data is generated using this script.
Create a new directory in your Astro project’s include directory called data.
Download the dataset from Astronomer’s GitHub and save it in include/data.
In your dags folder, create a file called airflow_with_snowpark_tutorial.py.
Copy the following code into the file. Make sure to provide your Snowflake database and schema names to MY_SNOWFLAKE_DATABASE and MY_SNOWFLAKE_SCHEMA.
This DAG consists of eight tasks in a simple ML orchestration pipeline.
(Optional) create_snowflake_objects: Creates the Snowflake objects required for the Snowflake custom XCom backend. This task uses the @task.snowflake_python decorator to run code within Snowpark, automatically instantiating a Snowpark session called snowpark_session from the connection ID provided to the snowflake_conn_id parameter. This task is a setup task and is only shown in the DAG graph if you set SETUP_TEARDOWN_SNOWFLAKE_CUSTOM_XCOM_BACKEND to True. See also Step 3.3.
load_file: Loads the data from the ski_dataset.csv file into the Snowflake table MY_SNOWFLAKE_TABLE using the load_file operator from the Astro Python SDK.
create_model_registry: Creates a model registry in Snowpark using the Snowpark ML package. Since the task is defined by the @task.snowflake_python decorator, the snowpark session is automatically instantiated from provided connection ID.
transform_table_step_one: Transforms the data in the Snowflake table using Snowpark syntax to filter to only include rows of skiers that ordered the beverages we are interested in. Computation of this task runs within Snowpark. The resulting table is written to XCom as a pandas DataFrame.
transform_table_step_two: Transforms the pandas DataFrame created by the upstream task to filter only for serious skiers (those who skied at least one hour that day).
This task uses the @task.snowpark_ext_python decorator, running the code in the Snowpark virtual environment created in Step 1. The binary provided to the python parameter of the decorator determines which virtual environment to run a task in. The @task.snowpark_ext_python decorator works analogously to the @task.external_python decorator, except the code is executed within Snowpark’s compute.
train_beverage_classifier: Trains a Snowpark Logistic Regression model on the dataset, saves the model to the model registry, and creates predictions from a test dataset. This task uses the @task.snowpark_virtualenv decorator to run the code in a newly created virtual environment within Snowpark’s compute. The requirements parameter of the decorator specifies the packages to install in the virtual environment. The model predictions are saved to XCom as a pandas DataFrame.
plot_metrics: Creates a plot of the model performance metrics and saves it to the include directory. This task runs in the Airflow environment using the @task decorator.
(Optional) cleanup_xcom_table: Cleans up the Snowflake custom XCom backend by dropping the XCOM_TABLE and XCOM_STAGE. This task is a teardown task and is only shown in the DAG graph if you set SETUP_TEARDOWN_SNOWFLAKE_CUSTOM_XCOM_BACKEND to True. See also Step 3.3.
(Optional) This DAG has two optional features you can enable.
If you want to use setup/ teardown tasks to create and clean up a Snowflake custom XCom backend for this DAG, set SETUP_TEARDOWN_SNOWFLAKE_CUSTOM_XCOM_BACKEND to True. This setting adds the create_snowflake_objects and cleanup_xcom_table tasks to your DAG and creates a setup/ teardown workflow. Note that your Snowflake account needs to have ACCOUNTADMIN privileges to perform the operations in the create_snowflake_objects task and you need to define the environment variables described in Step 1.8 to enable the custom XCom backend.
If you want to use a Snowpark-optimized warehouse for model training, set the USE_SNOWPARK_WH variable to True and provide your warehouse names to MY_SNOWPARK_WAREHOUSE and MY_SNOWFLAKE_REGULAR_WAREHOUSE. If the create_snowflake_objects task is enabled, it creates the MY_SNOWPARK_WAREHOUSE warehouse. Otherwise, you need to create the warehouse manually before running the DAG.
While this tutorial DAG uses a small dataset where model training can be accomplished using the standard Snowflake warehouse, Astronomer recommends using a Snowpark-optimized warehouse for model training in production.
Run astro dev start in your Astro project to start up Airflow and open the Airflow UI at localhost:8080.
In the Airflow UI, run the airflow_with_snowpark_tutorial DAG by clicking the play button.


In the Snowflake UI, view the model registry to see the model that was created by the DAG. In a production context, you can pull a specific model from the registry to run predictions on new data.

Navigate to your include directory to view the metrics.png image, which contains the model performance metrics shown at the start of this tutorial.
Congratulations! You trained a classification model in Snowpark using Airflow. This pipeline shows the three main options to run code in Snowpark using Airflow decorators:
@task.snowpark_python runs your code in a standard Snowpark environment. Use this decorator if you need to run code in Snowpark that does not require any additional packages that aren’t preinstalled in a standard Snowpark environment. The corresponding traditional operator is the SnowparkPythonOperator.@task.snowpark_ext_python runs your code in a pre-existing virtual environment within Snowpark. Use this decorator when you want to reuse virtual environments in different tasks in the same Airflow instances, or your virtual environment takes a long time to build. The corresponding traditional operator is the SnowparkExternalPythonOperator.@task.snowpark_virtualenv runs your code in a virtual environment in Snowpark that is created at runtime for that specific task. Use this decorator when you want to tailor a virtual environment to a task and don’t need to reuse it. The corresponding traditional operator is the SnowparkVirtualenvOperator.Corresponding traditional operators are available:
from snowpark_provider.operators.snowpark import SnowparkPythonOperator.from snowpark_provider.operators.snowpark import SnowparkExternalPythonOperator.from snowpark_provider.operators.snowpark import SnowparkVirtualenvOperator.