For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
      • AstroFully-managed data operations, powered by Apache Airflow.
      • Astro Private CloudRun Airflow-as-a-service in your environment.
      • Professional ServicesExpert Airflow services for your enterprise's success.
    • Tools
      • Cosmos
      • Orbiter
      • CLI
      • AI SDK
      • Agents
      • Blueprint
      • UpdatesThe State of Airflow 2026See the insights from over 5,800 data practitioners in the full report. Download Now ➔
  • Customers
  • Docs
    • Insights
      • Blog
      • Webinars
      • Resource Library
      • Events
    • Education
      • Academy
      • What is Airflow?
  • Pricing
Get Started Free
    • Overview
      • Anyscale
      • Azure Blob Storage
      • Azure Container Instances
      • Azure Data Factory integration
      • BigQuery
      • Cohere
      • Common AI
      • dbt
      • DuckDB
      • Entra Workload Identity
      • Execute notebooks
      • Fivetran
      • Great Expectations
      • Kafka
      • Marquez
      • MongoDB
      • MS SQL Server
      • OpenAI
      • OpenSearch
      • pgvector
      • Pinecone
      • PostgreSQL
      • Qdrant
      • Ray
      • SageMaker
      • Soda data quality
      • Weaviate
      • Weights and Biases
    • Glossary

Product

  • Platform Overview
  • Astro
  • Astro Observe
  • Astro Private Cloud
  • Security & Trust
  • Pricing

Tools & Services

  • Cosmos
  • Docs
  • Professional Services
  • Product Updates

Use Cases

  • AI Ops
  • Data Observability
  • ETL/ELT
  • ML Ops
  • Operational Analytics
  • All Use Cases

Industries

  • Financial Services
  • Gaming
  • Retail
  • Manufacturing
  • Healthcare
  • All Industries

Resources

  • Academy
  • eBooks & Guides
  • Blog
  • Webinars
  • Events
  • The Data Flowcast Podcast
  • All Resources

Airflow

  • What is Airflow
  • Airflow on Astro
  • Airflow 3.0
  • Airflow Upgrades
  • Airflow Use Cases
  • Airflow 2.x End of Life

Company

  • Our Story
  • Customers
  • Newsroom
  • Careers
  • Contact

Support

  • Knowledge Base
  • Status
  • Contact Support
GitHubYouTubeLinkedInx
  • Legal
  • Privacy
  • Terms of Service
  • Consent Preferences

  • Do Not Sell or Share My Personal information
  • Limit the Use Of My Sensitive Personal Information

Apache Airflow®, Airflow, and the Airflow logo are trademarks of the Apache Software Foundation. Copyright © Astronomer 2026. All rights reserved.

LogoLogo
On this page
  • Choosing the right operator
  • GXValidateDataFrameOperator
  • GXValidateBatchOperator
  • GXValidateCheckpointOperator
Integrations & connections

Orchestrate Great Expectations with Airflow

Edit this page
Built with

Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data SQL data, Filesystem Data or a pandas DataFrame. The airflow-provider-great-expectations package provides operators for running Great Expectations validations directly in your Dags.

Version 1.0.0 of the provider introduces three specialized operators that replace the legacy GreatExpectationsOperator:

OperatorUse case
GXValidateDataFrameOperatorValidate in-memory Spark or Pandas DataFrames
GXValidateBatchOperatorValidate data not in memory using a BatchDefinition
GXValidateCheckpointOperatorMost feature-rich: supports triggering actions on validation results

Requirements: Python 3.10+, Great Expectations 1.7.0+, Apache Airflow 2.1+.

Install with:

pip install airflow-provider-great-expectations

Each operator has its own import path:

1from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator
2from great_expectations_provider.operators.validate_batch import GXValidateBatchOperator
3from great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator

Choosing the right operator

When deciding which operator fits your use case, consider:

  1. Where is your data? In memory as a DataFrame, or in an external data source?
  2. Do you need to trigger actions? Such as sending notifications or updating external systems based on validation results.
  3. What Data Context do you need? Ephemeral for stateless validations, or persistent to track results over time.
ScenarioRecommended operator
Data already in memory as Pandas or Spark DataFrameGXValidateDataFrameOperator
Data in a database, warehouse, or file systemGXValidateBatchOperator
Need to trigger Slack notifications, emails, or other actionsGXValidateCheckpointOperator
Want full GX Core features with ValidationDefinitionsGXValidateCheckpointOperator

GXValidateDataFrameOperator

Use this operator when your data is already in memory as a Pandas or Spark DataFrame. This is the simplest option, you only need a DataFrame and your expectations.

GXValidateDataFrameOperator example
1from great_expectations_provider.operators.validate_dataframe import GXValidateDataFrameOperator
2
3def configure_dataframe():
4 """Load data into a DataFrame using an Airflow hook."""
5 from airflow.providers.common.sql.hooks.sql import DbApiHook
6
7 hook = DbApiHook.get_hook("my_db_conn")
8 return hook.get_pandas_df("SELECT * FROM daily_planet_report")
9
10
11def configure_expectations(dataframe):
12 """Define expectations for the DataFrame."""
13 import great_expectations as gx
14
15 suite = gx.ExpectationSuite(name="daily_report_suite")
16 suite.add_expectation(
17 gx.expectations.ExpectColumnValuesToNotBeNull(column="planet_name")
18 )
19 suite.add_expectation(
20 gx.expectations.ExpectColumnValuesToNotBeNull(column="total_passengers")
21 )
22 suite.add_expectation(
23 gx.expectations.ExpectColumnValuesToBeBetween(
24 column="total_net_fare_usd", min_value=0
25 )
26 )
27 suite.add_expectation(
28 gx.expectations.ExpectColumnValuesToBeBetween(
29 column="total_discounts_usd", min_value=0
30 )
31 )
32 return suite
33
34
35_validate_dataframe = GXValidateDataFrameOperator(
36 task_id="validate_with_gx_dataframe",
37 configure_dataframe=configure_dataframe,
38 configure_expectations=configure_expectations,
39 result_format="SUMMARY",
40 context_type="ephemeral",
41)
configure_expectations signature

The configure_expectations callable receives the DataFrame as its first argument and must return an Expectation or ExpectationSuite.

GXValidateBatchOperator

Use this operator when your data is not in memory. You configure GX to connect directly to your data source by defining a BatchDefinition. This approach works with databases, warehouses, and file systems.

The provider includes helper functions to build connection strings from Airflow connections, so you do not need to duplicate credentials.

GXValidateBatchOperator example (SQL)
1from great_expectations_provider.operators.validate_batch import GXValidateBatchOperator
2from great_expectations_provider.common.external_connections import build_snowflake_connection_string
3
4SNOWFLAKE_CONN_ID = "snowflake_default"
5
6
7def configure_batch_definition(context):
8 """Configure a batch definition for a SQL table."""
9 connection_string = build_snowflake_connection_string(
10 conn_id=SNOWFLAKE_CONN_ID,
11 schema="my_schema"
12 )
13
14 data_source = context.data_sources.add_sql(
15 name="snowflake_ds",
16 connection_string=connection_string,
17 )
18
19 table_asset = data_source.add_table_asset(
20 name="daily_report_table",
21 table_name="daily_planet_report",
22 )
23
24 return table_asset.add_batch_definition_whole_table(name="full_table")
25
26
27def configure_expectations(context):
28 """Define expectations and add them to the context."""
29 import great_expectations.expectations as gxe
30 from great_expectations import ExpectationSuite
31
32 return context.suites.add_or_update(
33 ExpectationSuite(
34 name="daily_report_batch_suite",
35 expectations=[
36 gxe.ExpectColumnValuesToNotBeNull(column="planet_name"),
37 gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"),
38 gxe.ExpectColumnValuesToBeBetween(
39 column="total_net_fare_usd", min_value=0
40 ),
41 gxe.ExpectColumnValuesToBeBetween(
42 column="total_discounts_usd", min_value=0
43 ),
44 ],
45 )
46 )
47
48
49_validate_batch = GXValidateBatchOperator(
50 task_id="validate_with_gx_batch",
51 configure_batch_definition=configure_batch_definition,
52 configure_expectations=configure_expectations,
53 result_format="SUMMARY",
54 context_type="ephemeral",
55)
GXValidateBatchOperator example (file-based)

For file-based data (CSV, Parquet), use add_pandas_filesystem instead:

1def configure_batch_definition(context):
2 """Configure a batch definition to read from a CSV file."""
3 from pathlib import Path
4
5 data_source = context.data_sources.add_pandas_filesystem(
6 name="my_datasource",
7 base_directory=Path("/path/to/data"),
8 )
9 csv_asset = data_source.add_csv_asset(name="daily_report_csv")
10 return csv_asset.add_batch_definition_path(
11 name="daily_report_batch",
12 path="daily_report.csv",
13 )
configure_expectations signature

For GXValidateBatchOperator, the configure_expectations callable receives the GX context as its first argument (not the DataFrame). Use context.suites.add_or_update() to register your suite.

GXValidateCheckpointOperator

Use this operator when you need the full power of GX Core, including the ability to trigger actions based on validation results. This requires the most configuration. You define a Checkpoint with a BatchDefinition, ExpectationSuite, and ValidationDefinition.

GXValidateCheckpointOperator example
1from great_expectations_provider.operators.validate_checkpoint import GXValidateCheckpointOperator
2from great_expectations_provider.common.external_connections import build_snowflake_connection_string
3
4SNOWFLAKE_CONN_ID = "snowflake_default"
5
6
7def configure_checkpoint(context):
8 """Configure a full GX checkpoint with validation and actions."""
9 import great_expectations.expectations as gxe
10 from great_expectations import Checkpoint, ExpectationSuite, ValidationDefinition
11
12 connection_string = build_snowflake_connection_string(
13 conn_id=SNOWFLAKE_CONN_ID,
14 schema="my_schema"
15 )
16
17 batch_definition = (
18 context.data_sources.add_sql(
19 name="snowflake_ds",
20 connection_string=connection_string,
21 )
22 .add_table_asset(
23 name="daily_report_table",
24 table_name="daily_planet_report",
25 )
26 .add_batch_definition_whole_table(name="full_table")
27 )
28
29 expectation_suite = context.suites.add(
30 ExpectationSuite(
31 name="daily_report_checkpoint_suite",
32 expectations=[
33 gxe.ExpectColumnValuesToNotBeNull(column="planet_name"),
34 gxe.ExpectColumnValuesToNotBeNull(column="total_passengers"),
35 gxe.ExpectColumnValuesToBeBetween(
36 column="total_net_fare_usd", min_value=0
37 ),
38 gxe.ExpectColumnValuesToBeBetween(
39 column="total_discounts_usd", min_value=0
40 ),
41 ],
42 )
43 )
44
45 validation_definition = context.validation_definitions.add(
46 ValidationDefinition(
47 name="daily_report_validation",
48 data=batch_definition,
49 suite=expectation_suite,
50 )
51 )
52
53 return context.checkpoints.add(
54 Checkpoint(
55 name="daily_report_checkpoint",
56 validation_definitions=[validation_definition],
57 actions=[], # Add SlackNotificationAction, EmailAction, etc.
58 )
59 )
60
61
62_validate_checkpoint = GXValidateCheckpointOperator(
63 task_id="validate_with_gx_checkpoint",
64 configure_checkpoint=configure_checkpoint,
65 context_type="ephemeral",
66)