Get Improved Data Quality Checks in Airflow with the Updated Great Expectations Operator

  • Benji Lampel
  • Tamara Fingerlin

As data quality becomes a core concern for leading data companies, the number of data quality checks running on Airflow pipelines is vastly increasing, creating a greater need to integrate Airflow with specialized data quality tools like Great Expectations. We at Astronomer have been working with Superconductive, the team behind Great Expectations, to continuously improve the GreatExpectationsOperator to provide a streamlined, Airflow-centric experience.

Recently, Astronomer took ownership of the Great Expectations Airflow Provider in a continued effort to maintain the operator and provide an Airflow-centric way to use Great Expectations. The first release we made comes with some great improvements and upgrades. Many thanks to the Superconductive team for their help!

The previous versions of the GreatExpectationsOperator required a Data Context, Checkpoint, and Expectations Suite. The new version provides greater ease-of-use and flexibility — just initialize a project, write your Expectations, and the operator will do the rest.

Updates and refinements include:

Improving a Data Quality Process by Adding Great Expectations

Despite its importance, data quality in pipelines is still largely an afterthought for data teams. With pressure to get data to the warehouse, understanding whether or not it’s the right data becomes a problem for the next sprint, then the next one, and then the one after that. But it shouldn’t be this way. Data quality should be a primary concern when building data pipelines, and it should be easy to include quality checks when a pipeline is first written.

Great Expectations simplifies the data quality check experience by converting SQL and Python checks into simple JSON templates called Expectations. Each Expectation comes with configuration options and is intuitively named in a human-readable way (for example the expect_column_distinct_values_to_be_in_set Expectation). This makes writing checks both easy and flexible, whether your data is in a relational database table or a Spark or pandas dataframe.

Why You Might Use Great Expectations in Your Airflow DAGs

If your organization is just starting its data quality journey, or it’s on the fence about switching tools, here are a few reasons why you might consider Great Expectations:

At Astronomer, we use Great Expectations for specific use cases within our data quality approach to add more detail about check successes and failures. As Great Expectations integrates natively with OpenLineage, we also get a more robust set of information about our datasets in the Lineage tab of our Astro deployments.

How We Added Great Expectations to Our Environment

Setting up a new project always comes with pain points, so we at Astronomer thought hard about making the configuration of Great Expectations in an Airflow environment as simple as possible. Notably, we’ve removed steps involving Jupyter notebooks and configuring YAML files. With the new GreatExpectationsOperator, all you need to do is init a project and write your Expectations.

Setting up a Great Expectations Environment in Airflow

The steps to add Great Expectations functionality to an Airflow deployment are simple:

  1. If you don’t have an existing Airflow setup, install the open-source Astro CLI and run astro dev init in your project directory.
  2. Install Great Expectations locally by running pip install great_expectations.
  3. Build the file structure inside the include folder of your Airflow instance with the command cd /path/to/airflow/project/include && great_expectations init.
  4. Add the Great Expectations Provider to your Airflow instance’s requirements.txt file.
  5. Turn on XCom pickling by setting the environment variable AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True. Alternatively, you can use a custom serialization method in a custom XCom backend.

Within minutes you’ll have a functioning Great Expectations environment in your Airflow instance, and you’ll be ready to use the new GreatExpectationsOperator.

Dynamic Task Mapping

In our internal use case, we used automatic datasource creation with the new GreatExpectationsOperator by passing in the connection ID (snowflake_connection) of our preexisting Airflow connection to our Snowflake database. After adding the /path/to/airflow/project/include/great_expectations file path as the data_context_root_dir parameter, we were prepared to pass in the names of our tables together with the corresponding Expectations Suite running checks on each table. By not setting the checkpoint_config or checkpoint_name parameters, we told the operator to create a default Checkpoint for us.

Since most of the configuration of the GreatExpectationsOperator would stay the same between running an Expectation Suite on specific tables, this was a great use case for dynamic task mapping.

The code snippet below shows all the code within our Great Expectations DAG. It’s short but powerful!

    list_of_checked_tables = [
        "SCHEMA_NAME_1.TABLE_NAME_1",
        "SCHEMA_NAME_1.TABLE_NAME_2",
        "SCHEMA_NAME_2.TABLE_NAME_1",

    ]

    @task
    def create_table_suite_pairs(table):
        """Creates pairs of table and Expectation suite names."""
        return {
            "data_asset_name": table,
            "expectation_suite_name": table.replace(".", "_") + "_SUITE",
        }

    gx_data_quality_checks = GreatExpectationsOperator.partial(
        task_id="gx_data_quality_checks",
        conn_id="snowflake_connection",
        data_context_root_dir=gx_root_dir,
    ).expand_kwargs(
        create_table_suite_pairs.expand(
            table=list_of_checked_tables
        )
    )

In this example, we provided a list of tables in the format "SCHEMA_NAME.TABLE_NAME" on which we want to run Expectation Suites. For each of these tables, we created an Expectation Suite in the include/great_expectations/expectations/ folder with the name of SCHEMA_NAME_TABLE_NAME_SUITE.

The create_table_suite_pairs task dynamically maps over the provided list of table names. The task creates a dictionary for each table name that contains matched inputs to two GreatExpectationsOperator parameters: data_asset_name and expectations_suite_name.

The resulting list of dictionaries is passed to the GreatExpectationsOperator to map over dynamically, creating a mapped task instance for every table that is being checked.

The graph view in Figure 1 (below) shows the two sequential tasks both having 13 dynamically mapped task instances.

image2

Figure 1. Screenshot of the Airflow UI with the Graph view of the Great Expectations DAG showing two dynamically mapped tasks.

In the grid view in Figure 2 (below) we can see any check failures in any mapped task instance. A data quality issue was found in the table checked in mapped task instance No. 10.

image5alt

Figure 2. Screenshot of the Airflow UI with the Grid view of the Great Expectations DAG showing 13 dynamically mapped task instances of the gx_data_quality_checks task. In this example, 12 mapped task instances completed successfully and one mapped task instance failed.

Easy Schema Checking with Great Expectations

Great Expectations simplifies the code to check for changes in table schemas. In the past, when we wanted to make sure the current list of columns of a table in the data warehouse matched a provided list of columns, we used a custom SQL statement with the SQLCheckOperator that included self-joins. With Great Expectations, we’re able to remove that complicated SQL statement in favor of the expect_table_columns_to_match_set Expectation.

This check is crucial because it alerts data quality stakeholders to any schema changes that might prompt the writing of additional data quality checks on a specific table.

Using this Expectation in a Suite is straightforward — you supply the name of the Expectation Suite as well as the list of known columns. Here’s an example:

{
  "data_asset_type": null,
  "expectation_suite_name": "<SCHEMA_NAME_TABLE_NAME_SUITE>",
  "expectations": [
    {
      "expectation_context": {
        "description": null
      },
      "expectation_type": "expect_table_columns_to_match_set",
      "ge_cloud_id": null,
      "kwargs": {
        "column_set": ["<column 1>", "<column 2>", …, "<column n>"],
        "exact_match": true
      },
      "meta": {}
    }
  ],
  "ge_cloud_id": null,
  "meta": {
    "great_expectations_version": "0.15.34"
  }
}

Running Great Expectations on Astro

One of the reasons teams choose to integrate Great Expectations with Airflow is the increased observability of data quality checks. Astro takes this integration a step further — with its built-in data lineage, Astro integrates seamlessly with Great Expectations. You just deploy the changes from your local environment via your existing setup and view Lineage events from the GreatExpectationsOperator in the Astro Lineage tab.

In Figure 3 (below) the Lineage graph shows the connection between tasks using the GreatExpectationsOperator and the datasets they are running data quality checks on (top quarter of the screenshot). The results from checks on each individual column can be viewed in the Quality tab, right down to which individual Expectations succeeded and which failed.

image1

Figure 3. Screenshot of the Lineage graph in the Astro UI showing lineage events recorded from the GreatExpectationOperator.

Additionally, if you configure your Great Expectations Data Docs to be saved in S3, Azure, or GCS, an operator extra link will appear on the new GreatExpectationsOperator that links to the Data Docs page of each specific Expectations Suite.

image4

Figure 4. Screenshot of the Data Docs page for a validation run of the Expectations Suite on a table called CLUSTERS. Out of 13 Expectations, one was unsuccessful. The schema check shows the mismatch between the expected schema and the observed one: a new column was added cluster_type.

Conclusion

You can now benefit from all of Great Expectations’ features by having Airflow handle much of the configuration for you, while still retaining full control and backwards compatibility with previous versions of the operator. Migrating from other data quality approaches, such as Airflow-native data quality check operators, is often only a matter of copying values into Expectations Suites using one of the many pre-defined Expectations.

If you want to improve the trust in your data quality with Great Expectations, check out the resources at the bottom of the page.

Ready to Get Started?

Get Started Free

Try Astro free for 14 days and power your next big data project.