Orchestrate Great Expectations with Airflow
Info
This page has not yet been updated for Airflow 3. The concepts shown are relevant, but some code may need to be updated. If you run any examples, take care to update import statements and watch for any other breaking changes.
Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. Astronomer, with help from Superconductive, maintains the Great Expectations Airflow Provider that gives users a convenient method for running validations directly from their DAGs.
This tutorial shows how to use the GreatExpectationsOperator in an Airflow DAG, leveraging automatic creation of a default Checkpoint and connecting via an Airflow connection.
Other ways to learn
There are multiple resources for learning about this topic. See also:
Time to complete
This tutorial takes approximately 20 minutes to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of Great Expectations. See Try GX Core.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
Prerequisites
- The Astro CLI.
- Access to a SQL database. This tutorial uses a local Postgres instance.
- (Optional) Local installation of the great_expectationspackage.
Step 1: Configure your Astro project
To use GX with Airflow, install the Great Expectations Airflow Provider in your Astro project.
- 
Create a new Astro project: 
- 
Add the GX Airflow provider to your Astro project requirements.txtfile.
- 
Add the following GX dependency to your Astro project packages.txtfile.
Step 2: Configure a GX project
The Great Expectations Airflow Provider requires a GX project to be present in your Airflow environment. The easiest way to create a GX project is by using the great_expectations package and following the steps below. If you cannot install the GX package locally, you can copy the great_expectations folder from this GitHub repository into your Astro project include folder instead and continue this tutorial at Step 3.
- 
In your includefolder, create a new file calledgx_init_script.py. Then, copy and paste the following code into the file:
- 
Go to your includefolder and run thegx_init_script.pyfile to instantiate a Data Context atinclude/great_expectations.
- 
Create a new file in your include/great_expectations/expectationsfolder calledstrawberry_suite.json. Then, copy and paste the following code into the file:This JSON file defines an Expectation Suite containing one expectation of the type expect_table_row_count_to_be_between. This expectation will check that the number of rows in a table is between 2 and 1000.You should now have the following file structure in your includefolder:
- 
Run astro dev startto start your Astro project.
Step 3: Create a database connection
The easiest way to use GX with Airflow is to let the GreatExpectationsOperator create a default Checkpoint and Datasource based on an Airflow connection. To set up a connection to a Postgres database complete the following steps:
- 
In the Airflow UI at localhost:8080, go to Admin -> Connections and click +.
- 
Create a new connection named postgres_defaultusing the following information:- Connection Id: postgres_default.
- Connection Type: Postgres.
- Host: <your postgres host>.
- Login: <your postgres username>.
- Password: <your postgres password>.
- Schema: postgres. Note that this is the name of the PostgreSQL database, not of the schema in the database. See the PostgreSQL provider documentation.
- Port: <your postgres port>.
 
- Connection Id: 
- 
Click Save. 
Step 4: Create a DAG
- 
Create a new file in your dagsfolder calledgx_tutorial.py.
- 
Copy and paste the following DAG code into the file: This DAG will create a table in your Postgres database, run a GX validation on the table, and then drop the table. The data in the table is validated using the GreatExpectationsOperator (GXO). The operator automatically creates a default Checkpoint and Datasource based on the postgres_defaultconnection and runs the Expectations defined in thestrawberry_suite.jsonfile on thestrawberriestable. Note that instead of using theschemaparameter of the GXO, you can also provide the schema name to thedata_asset_nameparameter in the form ofmy_schema_name.my_table_name.
- 
Open Airflow at http://localhost:8080/. Run the DAG manually by clicking the play button.
Info
By default, the
GreatExpectationsOperatorpushes aCheckpointResultobject to XCom. You can instead return a json-serializable dictionary by setting thereturn_json_dictparameter toTrue.If you do not want to use this built-in serialization, you can either enable XCom pickling by setting the environment variable
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True, or use a custom serialization method in a custom XCom backend.
How it works
The GreatExpectationsOperator is a versatile operator allowing you to integrate GX into your Airflow environment. This tutorial shows the simplest way of using the operator by letting it create a default Checkpoint and Datasource based on an Airflow connection.
The GreatExpectationsOperator also allows you to pass in a CheckpointConfig object using the checkpoint_config parameter or a checkpoint_kwargs dictionary. You can also customize the Execution Engines and pass DataContextConfig objects by configuring Operator parameters.
For more examples, check out the Get Improved Data Quality Checks in Airflow with the Updated Great Expectations Operator blog post and the Airflow data quality demo repository.
Operator parameters
The GreatExpectationsOperator is highly customizable to allow expert GX users to use their custom objects. This section explains some of the most commonly used parameters. Please refer to the GX documentation for in depth explanations of GX concepts.
When using the GreatExpectationsOperator, you must pass in either one of the following parameters:
- data_context_root_dir(str): The path to your GX project directory. This is the directory containing your- great_expectations.ymlfile.
- data_context_config(DataContextConfig): To use an in-memory Data Context, a- DataContextConfigmust be defined and passed to the operator.
Next, the operator will determine the Checkpoint and Datasource used to run your GX validations:
- If your Data Context does not specify a Datasource, and you do not pass in a Checkpoint, then the operator will build a Datasource and Checkpoint for you based on the conn_idyou pass in and run the given Expectation Suite. This is the simplest way to use the operator and what is shown in this tutorial.
- If your project’s Data Context specifies Datasources already, all you need to pass is the Data Context and the Expectation Suite. The operator will use the Datasources in the Data Context to create a default Checkpoint.
- If your project’s Store already contains a Checkpoint, you can specify its use by passing the name to the checkpoint_nameparameter. The CheckpointStore is often in thegreat_expectations/checkpoints/path, so thatcheckpoint_name = "strawberries.pass.chk"would reference the filegreat_expectations/checkpoints/strawberries/pass/chk.yml.
- If you want to run a custom Checkpoint, you can either pass a CheckpointConfig object to checkpoint_configor a dictionary of your checkpoint config tocheckpoint_kwargs.checkpoint_kwargscan also be used to specify additional overwriting configurations.
The Datasource can also be a pandas DataFrame, as shown in Running GX validations on pandas DataFrames. Depending on how you define your Datasource the data_asset_name parameter has to be adjusted:
- If a pandas DataFrame is passed, the data_asset_nameparameter can be any name that will help you identify the DataFrame.
- If a conn_idis supplied, thedata_asset_namemust be the name of the table the Expectations Suite runs on.
By default, a Great Expectations task runs validations and raises an AirflowException if any of the tests fail. To override this behavior and continue running the pipeline even if tests fail, set the fail_task_on_validation_failure flag to False.
In Astronomer or any environment with OpenLineage configured, the GreatExpectationsOperator will automatically add the OpenLineage action to its default action list when a Checkpoint is not specified to the operator. To turn off this feature, set the use_open_lineage parameter to False.
For a full list of parameters, see the the Astronomer registry. For more information about the parameters and examples, see the README in the provider repository.
Running GX validations on pandas DataFrames
The GreatExpectationsOperator can also be used to run validations on CSV files by passing them in as a pandas DataFrame. This pattern is useful to test pipelines locally with small amounts of data. Note that the execution_engine parameter needs to be adjusted.
Connections and backends
The GreatExpectationsOperator can run a Checkpoint on a dataset stored in any backend compatible with GX. This includes BigQuery, MSSQL, MySQL, PostgreSQL, Redshift, Snowflake, SQLite, and Athena, among others. All that’s needed to get the GreatExpectationsOperator to point at an external dataset is to set up an Airflow Connection to the Datasource and setting the conn_id parameter. Connections will still work if you have your connection configured in both Airflow and Great Expectations, as long as the correct Datasource is specified in the Checkpoint passed to the operator.