Webinar Recap

Testing Airflow to Bullet Proof Your Code

Webinar links:

Agenda

  • Testing in Python
  • Testing in Airflow

    • DAG integrity test
    • Unit testing in Airflow 101
    • Unit testing with templated arguments
    • Unit testing using mocking
    • Unit testing using Docker
    • Integration testing
    • DTAP environments
    • Airflow CLI
    • CI/CD

1. Testing in Python

Python testing frameworks:

  • unitest (Python builitin)
  • pytest (used by Airflow)
  • many more

Typical project structure for testing:

├── dags
│   ├── foo_dag.py
│   ├── bar_dag.py
│   └── hello_world_dag.py
├── mypackage
│   ├── __init__.py
│   └── magic.py
└── tests
    ├── dags
    │   └── test_dag_integrity.py
    ├── mypackage
    │   └── test_magic.py
    ├── test_1.py
    └── test_2.py

A good project structure mimics the structure of your actual project.

Example - checking if your multiplying method indeed multiplies correctly:

testing-airflow-bullet-proof-image4

Why Pytest? Pytest fixtures provide reusable building blocks, which give you flexibility:

import pytest
def multiply(a, b):
   return a * b
@pytest.fixture
def a():
   return 2
@pytest.fixture
def b():
   return 3
def test_multiply(a, b):
   assert multiply(a, b) == 6

2. DAG Integrity Test

a) Will filter out errors such as:

  • Missing required arguments (such as forgotten DAG ID)
  • Duplicate DAG ids
  • Cycles in DAGs

Filters out a lot of silly programming mistakes, so it’s super useful in the beginning of testing.

from airflow.models import DagBag

def test_dagbag():
    dag_bag = DagBag(include_examples=False) #Loads all DAGs in $AIRFLOW_HOME/dags
    assert not dag_bag.import_errors #Import errors aren't raised but captured to ensure all DAGs are parsed

b) Can be used to enforce conventions, e.g. “does each DAG have a tag?”

from airflow.models import DagBag

def test_dagbag():
    dag_bag = DagBag(include_examples=False)
    assert not dag_bag.import_errors
    for dag_id, dag in dag_bag.dags.items():
        assert dag.tags #Assert dag.tags is not empty

c) performing additional tests on all the DAGs

On the dagbag there’s an attribute called bags. It allows you to check certain things, such as: does each DAG have a tag?

3. Unit testing in Airflow

a) Simplest way to test an operator in Airflow:

call execute() on operator

Context is required, but it can be an empty dictionary.

def test_bash_operator():
   test = BashOperator(task_id="test", bash_command="echo hello")
   result = test.execute(context={}) #Each operator implements execute()
   assert result == "hello" #Check result

b) PythonOperator unit test with task context

Context can be supplied manually - if you want to do anything with the execution date in your task, you can provide it just by hand.

def test_python_operator():

    def return_today(**context):
        return f"Today is {context['execution_date'].strftime('%d-%m-%Y')}"

    test = PythonOperator(task_id="test", python_callable=return_today)
    result = test.execute(context={"execution_date": datetime.datetime(2021, 1, 1)})
    assert result == "Today is 01-01-2021"

The point where this basic unit testing fails is when you want to do templating, as execute() doesn’t do templates. Testing the templates requires something a bit more complex.

AssertionError: assert 'Today is {{ execution_date }}' == 'Today is 01-01-2021'

def test_bash_operator_template():
    test = BashOperator(task_id="test", bash_command="echo 'Today is {{ execution_date }}'")
    result = test.execute(context={"execution_date": datetime.datetime(2021, 1, 1)})
    assert result == "Today is 01-01-2021"

4. Unit testing with templated arguments

  • Templating arguments is done before execute()
  • Therefore cannot use execute() if testing templated arguments
  • Call run() instead
  • Match start_ date and end_date to run one instance (for predictable outcome)

Requires:

  • Airflow metastore
  • DAG

Manually creating a (local) metastore for testing:

airflow db init

This will create files in your home directory:

  • airflow.db
  • airflow.cfg
  • webserver_config.py

Not ideal - probably you don’t want to spam your home folder, so:

export AIRFLOW_HOME=[your project dir]
airflow db init

(make sure AIRFLOW_HOME is also set when running pytest)

But still, it’s a bit impractical to initiate metastore for every test...

So instead you can use this magical fixture to automatically reset Airflow metastore for every session with pytest (tests/conftest.py):

import os

import pytest

os.environ["AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS"] = "False"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW_HOME"] = os.path.dirname(os.path.dirname(__file__))

@pytest.fixture(autouse=True, scope="session")
def reset_db():
    from airflow.utils import db

    db.resetdb()
    yield

Best practices:

  • Separate the test
  • Do not include default connections
  • Do not include any example decks
  • Optionally, you can clean up the temp files.
  • Task context is fetched from various places, including the DAG, which is why you need a DAG, so to avoid state from old/other tasks run dag.clear()

Oops! No return value from run(), because it can be responsible for running multiple thoughts, and as a result it doesn't return any failure - what to do?

  • Therefore: write to temp file
  • Read file content to assert result
  • tmp_path is a pytest builtin fixture
import datetime
import pathlib

from airflow.models import DAG
from airflow.operators.bash import BashOperator

def test_bash_operator(tmp_path):
   with DAG(dag_id="test_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@daily") as dag:
       output_file = tmp_path / "output.txt"
       test = BashOperator(task_id="test", bash_command="echo {{ ds_nodash }} > " + str(output_file))
       dag.clear()
       test.run(
           start_date=dag.start_date, end_date=dag.start_date, ignore_first_depends_on_past=True, ignore_ti_state=True
       )

       assert output_file.read_text() == "20210101\n"

5. Unit testing using mocking

Mocking: typically useful when you want to talk to any external system in your test by running a test on your laptop, but you don't have access to any production database or production systems.

Mocking = replace functionality with “fake” behaviour

  • Avoid call to external system
  • Return predictable outcome

If you have, for example, an API that you're calling and that always returned some sort of random results you could use mocking to return one single predictable outcome.

Need knowledge of internal code - you need to know at what point or what method is responsible for making a call to an external system (metastore).

Different ways to specify mock.patch():

  • As context manager
with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
    variable_get_mock.do_something()
  • As decorator

    • mock objects provided as arguments in inverse order
    • avoids nested with … as …
@mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.get_connection")
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
def test_postgres_demo(base_hook_mock, pg_hook_mock, other_fixture):

6. Unit testing using Docker

Sometimes there are things that cannot be tested by mocking because:

  • They require a lot of knowledge of the internals of a certain system
  • There's just no nice mocking library available

Docker allows testing against “real” system.

  • pytest-docker-tools plugin provides convenience fixtures for defining Docker containers

7. Integration testing

At some point you're done with testing individual operators, and you would like to know if multiple operators together actually do what you expect them to do.

You can run a complete DAG in a test.

testing-airflow-bullet-proof-image3

You need a little bit of a knowledge of the internals of airflow to understand that there's, for example, a state attribute when your DAG runs etc.

8. DTAP environments

Sometimes you just want to test against a real system. Ideally you have more than one system to do that, as you don't want to deploy into a production system and then trigger the DAG.

If you want to bring your DAG from a branch into production, you typically have multiple branches, each corresponding to a specific Airflow. Then via pull request, you would merge from one branch into another all the way into production until you run in your production environments.

Your test environment should mimic your production environment - remember?

testing-airflow-bullet-proof-image5

9. Airflow CLI

airflow tasks test [dag_id] [task_id] [execution_date]

  • Can be useful if you need to debug in production
  • Won’t see run in UI (task instance is saved in DB though)

airflow dags test [dag_id] [execution_date]

  • Run complete DAG on terminal using DebugExecutor

10. CI/CD

testing-airflow-bullet-proof-image2

  1. Static checks (Flake8, Black, Pylint, …) - run first, small and quick to run
  2. Testing (generally done in 2nd step)
  3. Deployment code

GitHub Actions example: https://github.com/astronomer/airflow-testing-skeleton/blob/master/.github/workflows/ci.yaml

Getting Apache Airflow Certified

Join the 1000s of other data engineers who have received the Astronomer Certification for Apache Airflow Fundamentals. This exam assesses an understanding of the basics of the Airflow architecture and the ability to create simple data pipelines for scheduling and monitoring tasks.