Orchestrate Great Expectations with Airflow

Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. The airflow-provider-great-expectations package provides operators for running Great Expectations validations directly in your Dags.

Version 1.0.0 of the provider introduces three specialized operators that replace the legacy GreatExpectationsOperator:

OperatorUse case
GXValidateDataFrameOperatorValidate in-memory Spark or Pandas DataFrames
GXValidateBatchOperatorValidate data not in memory using a BatchDefinition
GXValidateCheckpointOperatorMost feature-rich: supports triggering actions on validation results

Requirements: Python 3.10+, Great Expectations 1.7.0+, Apache Airflow 2.1+.

Install with:

pip install airflow-provider-great-expectations

Each operator has its own import path:

1from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator
2from great_expectations_provider.operators.validate_batch import GXValidateBatchOperator
3from great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator

Choosing the right operator

When deciding which operator fits your use case, consider:

  1. Where is your data? In memory as a DataFrame, or in an external data source?
  2. Do you need to trigger actions? Such as sending notifications or updating external systems based on validation results.
  3. What Data Context do you need? Ephemeral for stateless validations, or persistent to track results over time.
ScenarioRecommended operator
Data already in memory as Pandas or Spark DataFrameGXValidateDataFrameOperator
Data in a database, warehouse, or file systemGXValidateBatchOperator
Need to trigger Slack notifications, emails, or other actionsGXValidateCheckpointOperator
Want full GX Core features with ValidationDefinitionsGXValidateCheckpointOperator

GXValidateDataFrameOperator

Use this operator when your data is already in memory as a Pandas or Spark DataFrame. This is the simplest option, you only need a DataFrame and your expectations.

1from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator
2
3def configure_dataframe():
4 """Load data into a DataFrame using an Airflow hook."""
5 from airflow.providers.common.sql.hooks.sql import DbApiHook
6
7 hook = DbApiHook.get_hook("my_db_conn")
8 return hook.get_pandas_df("SELECT * FROM daily_planet_report")
9
10
11def configure_expectations(dataframe):
12 """Define expectations for the DataFrame."""
13 import great_expectations as gx
14
15 suite = gx.ExpectationSuite(name="daily_report_suite")
16 suite.add_expectation(
17 gx.expectations.ExpectColumnValuesToNotBeNull(column="planet_name")
18 )
19 suite.add_expectation(
20 gx.expectations.ExpectColumnValuesToNotBeNull(column="total_passengers")
21 )
22 suite.add_expectation(
23 gx.expectations.ExpectColumnValuesToBeBetween(
24 column="total_net_fare_usd", min_value=0
25 )
26 )
27 suite.add_expectation(
28 gx.expectations.ExpectColumnValuesToBeBetween(
29 column="total_discounts_usd", min_value=0
30 )
31 )
32 return suite
33
34
35_validate_dataframe = GXValidateDataFrameOperator(
36 task_id="validate_with_gx_dataframe",
37 configure_dataframe=configure_dataframe,
38 configure_expectations=configure_expectations,
39 result_format="SUMMARY",
40 context_type="ephemeral",
41)
configure_expectations signature

The configure_expectations callable receives the DataFrame as its first argument and must return an Expectation or ExpectationSuite.

GXValidateBatchOperator

Use this operator when your data is not in memory. You configure GX to connect directly to your data source by defining a BatchDefinition. This approach works with databases, warehouses, and file systems.

The provider includes helper functions to build connection strings from Airflow connections, so you do not need to duplicate credentials.

1from great_expectations_provider.operators.validate_batch import GXValidateBatchOperator
2from great_expectations_provider.common.external_connections import build_snowflake_connection_string
3
4SNOWFLAKE_CONN_ID = "snowflake_default"
5
6
7def configure_batch_definition(context):
8 """Configure a batch definition for a SQL table."""
9 connection_string = build_snowflake_connection_string(
10 conn_id=SNOWFLAKE_CONN_ID,
11 schema="my_schema"
12 )
13
14 data_source = context.data_sources.add_sql(
15 name="snowflake_ds",
16 connection_string=connection_string,
17 )
18
19 table_asset = data_source.add_table_asset(
20 name="daily_report_table",
21 table_name="daily_planet_report",
22 )
23
24 return table_asset.add_batch_definition_whole_table(name="full_table")
25
26
27def configure_expectations(context):
28 """Define expectations and add them to the context."""
29 import great_expectations.expectations as gxe
30 from great_expectations import ExpectationSuite
31
32 return context.suites.add_or_update(
33 ExpectationSuite(
34 name="daily_report_batch_suite",
35 expectations=[
36 gxe.ExpectColumnValuesToNotBeNull(column="planet_name"),
37 gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"),
38 gxe.ExpectColumnValuesToBeBetween(
39 column="total_net_fare_usd", min_value=0
40 ),
41 gxe.ExpectColumnValuesToBeBetween(
42 column="total_discounts_usd", min_value=0
43 ),
44 ],
45 )
46 )
47
48
49_validate_batch = GXValidateBatchOperator(
50 task_id="validate_with_gx_batch",
51 configure_batch_definition=configure_batch_definition,
52 configure_expectations=configure_expectations,
53 result_format="SUMMARY",
54 context_type="ephemeral",
55)

For file-based data (CSV, Parquet), use add_pandas_filesystem instead:

1def configure_batch_definition(context):
2 """Configure a batch definition to read from a CSV file."""
3 from pathlib import Path
4
5 data_source = context.data_sources.add_pandas_filesystem(
6 name="my_datasource",
7 base_directory=Path("/path/to/data"),
8 )
9 csv_asset = data_source.add_csv_asset(name="daily_report_csv")
10 return csv_asset.add_batch_definition_path(
11 name="daily_report_batch",
12 path="daily_report.csv",
13 )
configure_expectations signature

For GXValidateBatchOperator, the configure_expectations callable receives the GX context as its first argument (not the DataFrame). Use context.suites.add_or_update() to register your suite.

GXValidateCheckpointOperator

Use this operator when you need the full power of GX Core, including the ability to trigger actions based on validation results. This requires the most configuration. You define a Checkpoint with a BatchDefinition, ExpectationSuite, and ValidationDefinition.

1from great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator
2from great_expectations_provider.common.external_connections import build_snowflake_connection_string
3
4SNOWFLAKE_CONN_ID = "snowflake_default"
5
6
7def configure_checkpoint(context):
8 """Configure a full GX checkpoint with validation and actions."""
9 import great_expectations.expectations as gxe
10 from great_expectations import Checkpoint, ExpectationSuite, ValidationDefinition
11
12 connection_string = build_snowflake_connection_string(
13 conn_id=SNOWFLAKE_CONN_ID,
14 schema="my_schema"
15 )
16
17 batch_definition = (
18 context.data_sources.add_sql(
19 name="snowflake_ds",
20 connection_string=connection_string,
21 )
22 .add_table_asset(
23 name="daily_report_table",
24 table_name="daily_planet_report",
25 )
26 .add_batch_definition_whole_table(name="full_table")
27 )
28
29 expectation_suite = context.suites.add(
30 ExpectationSuite(
31 name="daily_report_checkpoint_suite",
32 expectations=[
33 gxe.ExpectColumnValuesToNotBeNull(column="planet_name"),
34 gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"),
35 gxe.ExpectColumnValuesToBeBetween(
36 column="total_net_fare_usd", min_value=0
37 ),
38 gxe.ExpectColumnValuesToBeBetween(
39 column="total_discounts_usd", min_value=0
40 ),
41 ],
42 )
43 )
44
45 validation_definition = context.validation_definitions.add(
46 ValidationDefinition(
47 name="daily_report_validation",
48 data=batch_definition,
49 suite=expectation_suite,
50 )
51 )
52
53 return context.checkpoints.add(
54 Checkpoint(
55 name="daily_report_checkpoint",
56 validation_definitions=[validation_definition],
57 actions=[], # Add SlackNotificationAction, EmailAction, etc.
58 )
59 )
60
61
62_validate_checkpoint = GXValidateCheckpointOperator(
63 task_id="validate_with_gx_checkpoint",
64 configure_checkpoint=configure_checkpoint,
65 context_type="ephemeral",
66)