Use setup and teardown tasks in Airflow

Info

This page has not yet been updated for Airflow 3. The concepts shown are relevant, but some code may need to be updated. If you run any examples, take care to update import statements and watch for any other breaking changes.

In production Airflow environments, it’s best practice to set up resources and configurations before certain tasks can run, then tear the resources down even if the tasks fail. This pattern can reduce resource utilization and save costs.

Starting in Airflow 2.7, you can use a special type of task to create and delete resources. In this guide, you will learn all about setup and teardown tasks in Airflow.

DAG with setup/ teardown - all successful

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

When to use setup/ teardown tasks

Setup/ teardown tasks ensure that the necessary resources to run an Airflow task are set up before a task is executed and that those resources are torn down after the task has completed, regardless of any task failures.

Any existing Airflow task can be designated as a setup or teardown task, with special behavior and added visibility of the setup/ teardown relationship in the Airflow UI.

There are many use cases for setup and teardown tasks. For example, you might want to:

  • Manage a Spark cluster to run heavy workloads.
  • Manage compute resources to train an ML model.
  • Manage the resources to run data quality checks.
  • Set up storage in your custom XCom backend to hold data processed through Airflow tasks, then tear the extra storage down afterwards when the XCom data is no longer needed.

Setup/ teardown concepts

Any task can be designated as a setup or a teardown task. A setup task, its teardown task, and the tasks in between constitute a setup/ teardown workflow.

Tasks that run after a setup task and before the associated teardown task are considered to be in scope of the setup/ teardown workflow. Usually these tasks will use the resources set up by the setup task and which the teardown task will dismantle.

Setup/ teardown tasks have different behavior from regular tasks:

  • Clearing a task that is in scope of a setup/ teardown workflow will also clear and rerun the associated setup and teardown tasks, ensuring that all resources the task needs are created again for the task rerun and torn down after the task has completed.

  • A teardown task will run as long as at least one of its associated setup tasks have completed successfully and all of its upstream tasks have completed, regardless of whether they were successful or not. If all associated setup tasks fail or are skipped, the teardown task will be failed or skipped respectively.

  • A teardown task without any associated setup tasks will always run once all upstream worker tasks have completed running, independently of whether they were successful or not.

  • When evaluating whether a DAG run was successful, Airflow will ignore teardown tasks by default. This means if a teardown task fails as the final task of a DAG, the DAG is still marked as having succeeded. In the example shown in the screenshot below, the DAG run state is not impacted by the failure of tear_down_cluster and is marked as successful. You can change this behavior by setting on_failure_fail_dagrun=True in the .as_teardown() method or @teardown decorator.

    Successful DAG with failed teardown

  • When a teardown task is within a task group and a dependency is set on the task group, the teardown task will be ignored when evaluating if a dependency has been met. For example, run_after_task_group, which is dependent on the work_in_the_cluster task group, will run even if the teardown task has failed or is still running.

    Task group with teardown

  • You can have a setup task without an associated teardown task and vice versa. If you define a setup task without a teardown task, everything downstream of the setup task is considered in its scope and will cause the setup task to rerun when cleared.

Before and after using setup and teardown tasks

Setup and teardown tasks can help you write more robust DAGs by making sure resources are set up at the right moment and torn down even when worker tasks fail.

The following DAG is not using Airflow setup and teardown functionality. It sets up its resources using a standard task called provision_cluster, runs three worker tasks using those resources, and tears down the resources using the tear_down_cluster task.

DAG without Setup/ teardown - all successful

The way this DAG is set up, a failure in any of the worker tasks will lead to the tear_down_cluster task not running. This means that the resources will not be torn down and will continue to incur costs. Additionally, any downstream tasks depending on tear_down_cluster will also fail to run unless they have trigger rules to run independently of upstream failures.

DAG without setup/ teardown - upstream failure

In this example, you can turn the provision_cluster task into a setup task and the tear_down_cluster into a teardown task by using the code examples shown in setup/ teardown implementation.

After you convert the tasks, the Grid view shows your setup tasks with an upwards arrow and teardown tasks with a downwards arrow. After you configure the setup/ teardown workflow between provision_cluster and tear_down_cluster, the tasks are connected by a dotted line. The tasks worker_task_1, worker_task_2 and worker_task_3 are in the scope of this setup/ teardown workflow.

DAG with setup/ teardown - all successful

Now, even if one of the worker tasks fails, like worker_task_2 in the following screenshot, the tear_down_cluster task will still run, the resources will be torn down, and downstream tasks will run successfully.

DAG with setup/ teardown - upstream failure

Additionally, when you clear any of the worker tasks, both the setup and teardown tasks will also be cleared and rerun. This is useful when you are recovering from a pipeline issue and need to rerun one or more tasks that use a resource independent of the other tasks in the scope.

For example, in the previous DAG, consider if worker_task_2 failed and worker_task_3 was unable to run due to its upstream task having failed. If you cleared worker_task_2 by clicking Clear task, both the setup task provision_cluster and the teardown task tear_down_cluster will be cleared and rerun in addition to worker_task_2, worker_task_3 and downstream_task. This lets you completely recover without needing to rerun worker_task_1 or manually rerun individual tasks.

DAG with setup/ teardown - recovery

Setup/ teardown implementation

There are two ways to turn tasks into setup/ teardown tasks:

  • Using the .as_setup() and .as_teardown() methods on TaskFlow API tasks or traditional operators.
  • Using the @setup and @teardown decorators on a Python function.

Worker tasks can be added to the scope of a setup/ teardown workflow in two ways:

  • By being between the setup and teardown tasks in the DAG dependency relationship.
  • By using a context manager with the .teardown() method.

Which method you choose to add worker tasks to a setup/ teardown scope is a matter of personal preference.

You can define as many setup and teardown tasks in one DAG as you need. In order for Airflow to understand which setup and teardown tasks belong together, you need to create setup/ teardown workflows.

.as_setup() and .as_teardown() methods

Any individual task can be turned into a setup or teardown task.

To turn a task into a setup task, call the .as_setup() method on the called task object.

Taskflow
1@task
2def my_setup_task():
3 return "Setting up resources!"
4
5my_setup_task_obj = my_setup_task()
6my_setup_task_obj.as_setup()
7
8# it is also possible to call `.as_setup()` directly on the function call
9# my_setup_task().as_setup()

Setup task decorator

Traditional
1def my_setup_task_func():
2 return "Setting up resources!"
3
4my_setup_task_obj = PythonOperator(
5 task_id="my_setup_task",
6 python_callable=my_setup_task_func,
7)
8
9my_setup_task_obj.as_setup()

Setup task traditional operator

To turn a task into a teardown task, call the .as_teardown() method on the called task object. Note that you cannot have a teardown task without at least one upstream worker task.

Taskflow
1@task
2def worker_task():
3 return "Doing some work!"
4
5@task
6def my_teardown_task():
7 return "Tearing down resources!"
8
9my_teardown_task_obj = my_teardown_task()
10worker_task() >> my_teardown_task_obj.as_teardown()
11
12# it is also possible to call `.as_teardown()` directly on the function call
13# worker_task() >> my_teardown_task().as_teardown()

Teardown task decorator

Traditional
1def worker_task_func():
2 return "Doing some work!"
3
4worker_task_obj = PythonOperator(
5 task_id="worker_task",
6 python_callable=worker_task_func,
7)
8
9def my_teardown_task_func():
10 return "Setting up resources!"
11
12my_teardown_task_obj = PythonOperator(
13 task_id="my_teardown_task",
14 python_callable=my_teardown_task_func,
15)
16
17worker_task_obj >> my_teardown_task_obj.as_teardown()

Teardown task traditional

After you have defined your setup and teardown tasks you need to define their workflow in order for Airflow to know which setup and teardown tasks perform actions on the same resources.

@setup and @teardown decorators

When working with the TaskFlow API you can also use the @setup and @teardown decorators to turn any Python function into a setup or teardown task.

1from airflow.decorators import setup
2
3@setup
4def my_setup_task():
5 return "Setting up resources!"
6
7my_setup_task()

Setup task decorator

As with the .as_teardown() method you cannot have a @teardown task without at least one upstream worker task. The worker task can use the @task decorator or be defined with a traditional operator.

1from airflow.decorators import task, teardown
2
3@task
4def worker_task():
5 return "Doing some work!"
6
7@teardown
8def my_teardown_task():
9 return "Tearing down resources!"
10
11worker_task() >> my_teardown_task()

Teardown task decorator

After you have defined your setup and teardown tasks you need to create their workflows in order for Airflow to know which setup and teardown tasks perform actions on the same resources.

Creating setup/ teardown workflows

Airflow needs to know which setup and teardown tasks are related based on the resources they manage. Setup and teardown tasks can be defined in the same workflow by:

  • Providing the setup task object to the setups argument in the .as_teardown() method of a teardown task object.
  • Connecting a setup and a teardown task with a normal task dependency using the bit-shift operator (>>) or a dependency function like chain().
  • Providing the called object of a task created using the @setup decorator as an argument to a task created using the @teardown decorator.

Which method you use is a matter of personal preference. However, note that if you are using @setup and @teardown decorators, you cannot use the setups argument.

You can have multiple sets of setup and teardown tasks in a DAG, both in parallel and nested workflows.

There are no limits to how many setup and teardown tasks you can have, nor are there limits to how many worker tasks you can include in their scope.

For example, you could have one task that creates a cluster, a second task that modifies the environment within that cluster, and a third task that tears down the cluster. In this case you could define the first two tasks as setup tasks and the last one as a teardown task, all belonging to the same resource. In a second step, you could add 10 tasks performing actions on that cluster to the scope of the setup/ teardown workflow.

There are multiple methods for linking setup and teardown tasks.

Taskflow_Setups

Using the @task decorator, you can use the .as_teardown() method and the setups argument to define which setup tasks are in the same workflow as the teardown task. Note that it is also possible to use @setup and @teardown decorators instead and link them using direct dependencies.

1@task
2def my_setup_task():
3 return "Setting up resources!"
4
5@task
6def worker_task():
7 return "Doing some work!"
8
9@task
10def my_teardown_task():
11 return "Tearing down resources!"
12
13my_setup_task_obj = my_setup_task()
14
15(
16 my_setup_task_obj#.as_setup() does not need to be called anymore
17 >> worker_task()
18 >> my_teardown_task().as_teardown(setups=my_setup_task_obj)
19)

Setup/ teardown method decorator

Traditional_Setups

If you are using traditional Airflow operators, you can use the .as_teardown() method and the setups argument to define which setup tasks are in the same workflow as the teardown task.

1def my_setup_task_func():
2 return "Setting up resources!"
3
4def worker_task_func():
5 return "Doing some work!"
6
7def my_teardown_task_func():
8 return "Tearing down resources!"
9
10my_setup_task_obj = PythonOperator(
11 task_id="my_setup_task",
12 python_callable=my_setup_task_func,
13)
14
15worker_task_obj = PythonOperator(
16 task_id="worker_task",
17 python_callable=worker_task_func,
18)
19
20my_teardown_task_obj = PythonOperator(
21 task_id="my_teardown_task",
22 python_callable=my_teardown_task_func,
23)
24
25(
26 my_setup_task_obj#.as_setup() does not need to be called anymore
27 >> worker_task_obj
28 >> my_teardown_task_obj.as_teardown(setups=my_setup_task_obj)
29)

Setup/ teardown relationships traditional

Taskflow_Direct

Instead of using the setups argument you can directly link the setup and teardown tasks with a traditional dependency. Whenever you define a direct dependency between a setup and a teardown task Airflow will interpret this as them being in the same workflow together, no matter what actions the tasks actually perform.

1(
2 my_setup_task_obj.as_setup() # calling .as_setup() is necessary
3 >> worker_task()
4 >> my_teardown_task_obj.as_teardown()
5)
6
7my_setup_task_obj >> my_teardown_task_obj

This code creates an identical DAG using the setups argument.

1(
2 my_setup_task_obj#.as_setup() is not necessary
3 >> worker_task()
4 >> my_teardown_task_obj.as_teardown(setups=my_setup_task_obj)
5)

Setup/ teardown method decorator

Decorators

With the@setup and @teardown decorators, you can define the setup/ teardown workflow between two tasks either by defining direct dependencies or by providing the object of the setup task as an argument to the teardown task.

The latter pattern is often used to pass information like a resource id from the setup task to the teardown task.

1from airflow.decorators import task, setup, teardown
2
3@setup
4def my_setup_task():
5 print("Setting up resources!")
6 my_cluster_id = "cluster-2319"
7 return my_cluster_id
8
9@task
10def worker_task():
11 return "Doing some work!"
12
13@teardown
14def my_teardown_task(my_cluster_id):
15 return f"Tearing down {my_cluster_id}!"
16
17my_setup_task_obj = my_setup_task()
18my_setup_task_obj >> worker_task() >> my_teardown_task(my_setup_task_obj)

Setup/ teardown method decorator

Context_Manager

You can also use a task that calls the .as_teardown() method to wrap a set of tasks that should be in scope of a setup/ teardown workflow. The code snippet below shows three tasks being in scope of the setup/ teardown workflow created by my_cluster_setup_task and my_cluster_teardown_task.

1with my_cluster_teardown_task_obj.as_teardown(setups=my_cluster_setup_task_obj):
2 worker_task_1() >> [worker_task_2(), worker_task_3()]

Setup/ teardown created using a context manager

Note that a task that was already instantiated outside of the context manager can still be added to the scope, but you have to do this explicitly using the .add_task() method on the context manager object.

1# task instantiation outside of the context manager
2worker_task_1_obj = worker_task_1()
3
4with my_cluster_teardown_task_obj.as_teardown(
5 setups=my_cluster_setup_task_obj
6) as my_scope:
7 # adding the task to the context manager
8 my_scope.add_task(worker_task_1_obj)

Using multiple setup/ teardown tasks in one workflow

To define several setup tasks for one teardown task, you can pass a list of setup tasks to the setups argument. You do not need to call .as_setup() on any of the setup tasks.

1(
2 [my_setup_task_obj_1, my_setup_task_obj_2, my_setup_task_obj_3]
3 >> worker_task()
4 >> my_teardown_task().as_teardown(
5 setups=[my_setup_task_obj_1, my_setup_task_obj_2, my_setup_task_obj_3]
6 )
7)

Setup/ teardown relationships multiple setup

To define several teardown tasks for one setup task, you have to provide the setup task object to the setups argument of the .as_teardown() method of each teardown task.

1(
2 my_setup_task_obj
3 >> worker_task()
4 >> [
5 my_teardown_task_obj_1.as_teardown(setups=my_setup_task_obj),
6 my_teardown_task_obj_2.as_teardown(setups=my_setup_task_obj),
7 my_teardown_task_obj_3.as_teardown(setups=my_setup_task_obj),
8 ]
9)

Setup/ teardown relationships multiple setup

If your setup/ teardown workflow contains more than one setup and one teardown task, you need to define several dependencies, when not using the setups argument. Each setup task needs to be set as an upstream dependency to each teardown task. The example below shows a setup/ teardown workflow containing two setup tasks and two teardown tasks. To define the workflow, you need to set four dependencies.

1(
2 [my_setup_task_obj_1.as_setup(), my_setup_task_obj_2.as_setup()]
3 >> worker_task()
4 >> [my_teardown_task_obj_1.as_teardown(), my_teardown_task_obj_2.as_teardown()]
5)
6
7# defining the dependency between each setup and each teardown task
8my_setup_task_obj_1 >> my_teardown_task_obj_1
9my_setup_task_obj_1 >> my_teardown_task_obj_2
10my_setup_task_obj_2 >> my_teardown_task_obj_1
11my_setup_task_obj_2 >> my_teardown_task_obj_2

This code creates an identical DAG using the setups argument.

1(
2 [my_setup_task_obj_1, my_setup_task_obj_2]
3 >> worker_task()
4 >> [
5 my_teardown_task_obj_1.as_teardown(
6 setups=[my_setup_task_obj_1, my_setup_task_obj_2]
7 ),
8 my_teardown_task_obj_2.as_teardown(
9 setups=[my_setup_task_obj_1, my_setup_task_obj_2]
10 ),
11 ]
12)

Multiple setups/ teardowns

Parallel setup/ teardown workflows

You can have several independent sets of setup and teardown tasks in the same DAG. For example, you might have a workflow of tasks that sets up and tears down a cluster and another workflow that sets up and tears down a temporary database.

Decorators
1from airflow.decorators import task, setup, teardown
2
3@setup
4def my_cluster_setup_task():
5 print("Setting up resources!")
6 my_cluster_id = "cluster-2319"
7 return my_cluster_id
8
9@task
10def my_cluster_worker_task():
11 return "Doing some work!"
12
13@teardown
14def my_cluster_teardown_task(my_cluster_id):
15 return f"Tearing down {my_cluster_id}!"
16
17@setup
18def my_database_setup_task():
19 print("Setting up my database!")
20 my_database_name = "DWH"
21 return my_database_name
22
23@task
24def my_database_worker_task():
25 return "Doing some work!"
26
27@teardown
28def my_database_teardown_task(my_database_name):
29 return f"Tearing down {my_database_name}!"
30
31my_setup_task_obj = my_cluster_setup_task()
32(
33 my_setup_task_obj
34 >> my_cluster_worker_task()
35 >> my_cluster_teardown_task(my_setup_task_obj)
36)
37
38my_database_setup_obj = my_database_setup_task()
39(
40 my_database_setup_obj
41 >> my_database_worker_task()
42 >> my_database_teardown_task(my_database_setup_obj)
43)
Methods
1@task
2def my_cluster_setup_task():
3 print("Setting up resources!")
4 my_cluster_id = "cluster-2319"
5 return my_cluster_id
6
7@task
8def my_cluster_worker_task():
9 return "Doing some work!"
10
11@task
12def my_cluster_teardown_task(my_cluster_id):
13 return f"Tearing down {my_cluster_id}!"
14
15@task
16def my_database_setup_task():
17 print("Setting up my database!")
18 my_database_name = "DWH"
19 return my_database_name
20
21@task
22def my_database_worker_task():
23 return "Doing some work!"
24
25@task
26def my_database_teardown_task(my_database_name):
27 return f"Tearing down {my_database_name}!"
28
29my_setup_task_obj = my_cluster_setup_task()
30(
31 my_setup_task_obj
32 >> my_cluster_worker_task()
33 >> my_cluster_teardown_task(my_setup_task_obj).as_teardown(
34 setups=my_setup_task_obj
35 )
36)
37
38my_database_setup_obj = my_database_setup_task()
39(
40 my_database_setup_obj
41 >> my_database_worker_task()
42 >> my_database_teardown_task(my_database_setup_obj).as_teardown(
43 setups=my_database_setup_obj
44 )
45)

Parallel groups of setup/ teardown

Nested setup/ teardown workflows

You can nest setup and teardown tasks to have an outer and inner scope. This is useful if you have basic resources, such as a cluster that you want to set up once and then tear down after all the work is done, but you also have resources running on that cluster that you want to set up and tear down for individual groups of tasks.

The example below shows the dependency code for a simple structure with an outer and inner setup/ teardown workflow:

  • outer_setup and outer_teardown are the outer setup and teardown tasks.
  • inner_setup and inner_teardown are the inner setup and teardown tasks and both are in scope of the outer setup/ teardown workflow.
  • inner_worker_1 and inner_worker_2 are worker tasks that are in scope of the inner setup/ teardown workflow. All tasks in scope of the inner setup/ teardown workflow will also be in scope of the outer setup/ teardown workflow.
  • outer_worker_1, outer_worker_2, outer_worker_3 are worker tasks that are in scope of the outer setup/ teardown workflow.
1outer_setup_obj = outer_setup()
2inner_setup_obj = inner_setup()
3outer_teardown_obj = outer_teardown()
4
5(
6 outer_setup_obj
7 >> inner_setup_obj
8 >> [inner_worker_1(), inner_worker_2()]
9 >> inner_teardown().as_teardown(setups=inner_setup_obj)
10 >> [outer_worker_1(), outer_worker_2()]
11 >> outer_teardown_obj.as_teardown(setups=outer_setup_obj)
12)
13
14outer_setup_obj >> outer_worker_3() >> outer_teardown_obj

Setup/ teardown nesting

Clearing a task will clear all setups and teardowns the task is in scope of, in addition to all downstream tasks. For example:

  • Clearing any of the outer worker tasks (outer_worker_1, outer_worker_2, outer_worker_3) will also clear outer_setup, outer_teardown.
  • Clearing any of the inner worker tasks (inner_worker_1, inner_worker_2) will clear inner_setup, inner_teardown, outer_setup, and outer_teardown. Additionally outer_worker_1 and outer_worker_2 will be cleared because they are downstream of the inner worker tasks. outer_worker_3 will not be cleared because it runs parallel to the inner worker tasks.

Narrowing the scope of a setup task

If you have a setup task with no associated downstream task, you can narrow the scope of the setup task by using an empty task as its teardown. For example, if my_worker_task_3_obj does not need the resources created by my_setup_task and should not cause a rerun of the setup task when cleared, you can add an empty teardown task in the dependency chain:

1my_setup_task >> [my_worker_task_1_obj >> my_worker_task_2_obj] >> my_worker_task_3_obj
2
3[my_worker_task_1_obj >> my_worker_task_2_obj] >> EmptyOperator(
4 task_id="empty_task"
5).as_teardown(setups=my_setup_task)

Example DAG

The DAG shown in this example mimics a setup/ teardown pattern that you can run locally. The setup/ teardown workflow consists of the following tasks:

  • The create_csv task is a setup task that creates a CSV file in a directory specified as a DAG param.

  • The write_to_csv task is a setup task that writes data to the CSV file.

  • The fetch_data task is a setup task that fetches data from a remote source and writes it to the CSV file.

  • The delete_csv task is the associated teardown task and deletes the resource of the CSV file.

  • The get_average_age_obj task is in scope of the setup/ teardown workflow. If this task fails, the DAG still needs to delete the “CSV file” afterwards (to make it more real, consider the CSV file to be an expensive cluster).

    To recover from a failure when rerunning the get_average_age_obj task, you always need the CSV file to be created again, as well as the data to be fetched again and written to the CSV file. Because the task is in scope of create_csv, write_to_csv, and fetch_data, these tasks will also rerun when you rerun get_average_age_obj.

The DAG contains 3 tasks which are not in scope of the setup/ teardown workflow:

  • The start task is an empty task at the start of the DAG.
  • The report_file_path task is a task that prints the path of the CSV file to the logs.
  • The end task is an empty task at the end of the DAG.

This DAG comes with a convenience parameter to test setup/ teardown functionality. Toggle fetch_bad_data in the Trigger DAG view to cause bad data to get into the pipeline and the get_average_age_obj to fail. You will see that delete_csv will still run and delete the CSV file. In a real-world scenario, after fixing the data issue you would clear the get_average_age_obj task and all tasks of the setup/ teardown workflow would rerun and complete successfully.

Methods
1"""
2## Use `.as_teardown()` in a simple local example to enable setup/teardown functionality
3
4DAG that uses setup/teardown to prepare a CSV file to write to and then showcases the
5behavior in case faulty data is fetched.
6"""
7
8from airflow.decorators import dag, task
9from airflow.models.baseoperator import chain
10from pendulum import datetime
11from airflow.models.param import Param
12from airflow.operators.empty import EmptyOperator
13import os
14import csv
15import time
16
17
18def get_params_helper(**context):
19 folder = context["params"]["folder"]
20 filename = context["params"]["filename"]
21 cols = context["params"]["cols"]
22 return folder, filename, cols
23
24
25@dag(
26 start_date=datetime(2023, 7, 1),
27 schedule=None,
28 catchup=False,
29 params={
30 "folder": "include/my_data",
31 "filename": "data.csv",
32 "cols": ["id", "name", "age"],
33 "fetch_bad_data": Param(False, type="boolean"),
34 },
35 tags=[".is_teardown()", "setup/teardown"],
36)
37def setup_teardown_csv_methods():
38 start = EmptyOperator(task_id="start")
39 end = EmptyOperator(task_id="end")
40
41 @task
42 def report_filepath(**context):
43 folder, filename, cols = get_params_helper(**context)
44 print(f"Filename: {folder}/{filename}")
45
46 @task
47 def create_csv(**context):
48 folder, filename, cols = get_params_helper(**context)
49
50 if not os.path.exists(folder):
51 os.makedirs(folder)
52
53 with open(f"{folder}/{filename}", "w", newline="") as f:
54 writer = csv.writer(f)
55 writer.writerows([cols])
56
57 @task
58 def fetch_data(**context):
59 bad_data = context["params"]["fetch_bad_data"]
60
61 if bad_data:
62 return [
63 [1, "Joe", "Forty"],
64 [2, "Tom", 29],
65 [3, "Lea", 19],
66 ]
67 else:
68 return [
69 [1, "Joe", 40],
70 [2, "Tom", 29],
71 [3, "Lea", 19],
72 ]
73
74 @task
75 def write_to_csv(data, **context):
76 folder, filename, cols = get_params_helper(**context)
77
78 with open(f"{folder}/{filename}", "a", newline="") as f:
79 writer = csv.writer(f)
80 writer.writerows(data)
81
82 time.sleep(10)
83
84 @task
85 def get_average_age(**context):
86 folder, filename, cols = get_params_helper(**context)
87
88 with open(f"{folder}/{filename}", "r", newline="") as f:
89 reader = csv.reader(f)
90 next(reader)
91 ages = [int(row[2]) for row in reader]
92
93 return sum(ages) / len(ages)
94
95 @task
96 def delete_csv(**context):
97 folder, filename, cols = get_params_helper(**context)
98
99 os.remove(f"{folder}/{filename}")
100
101 if not os.listdir(f"{folder}"):
102 os.rmdir(f"{folder}")
103
104 start >> report_filepath() >> end
105
106 create_csv_obj = create_csv()
107 fetch_data_obj = fetch_data()
108 write_to_csv_obj = write_to_csv(fetch_data_obj)
109 get_average_age_obj = get_average_age()
110 delete_csv_obj = delete_csv()
111
112 chain(
113 start,
114 create_csv_obj,
115 write_to_csv_obj,
116 get_average_age_obj,
117 delete_csv_obj.as_teardown(
118 setups=[create_csv_obj, write_to_csv_obj, fetch_data_obj]
119 ),
120 end,
121 )
122
123
124setup_teardown_csv_methods()
Decorators
1"""
2## Use `@setup` and `@teardown` in a simple local example to enable setup/teardown functionality
3
4DAG that uses setup/teardown to prepare a CSV file to write to and then showcases the
5behavior in case faulty data is fetched.
6"""
7
8from airflow.decorators import dag, task, setup, teardown
9from airflow.models.baseoperator import chain
10from pendulum import datetime
11from airflow.models.param import Param
12from airflow.operators.empty import EmptyOperator
13import os
14import csv
15import time
16
17
18def get_params_helper(**context):
19 folder = context["params"]["folder"]
20 filename = context["params"]["filename"]
21 cols = context["params"]["cols"]
22 return folder, filename, cols
23
24
25@dag(
26 start_date=datetime(2023, 7, 1),
27 schedule=None,
28 catchup=False,
29 params={
30 "folder": "include/my_data",
31 "filename": "data.csv",
32 "cols": ["id", "name", "age"],
33 "fetch_bad_data": Param(False, type="boolean"),
34 },
35 tags=["@setup", "@teardown", "setup/teardown"],
36)
37def setup_teardown_csv_decorators():
38 start = EmptyOperator(task_id="start")
39 end = EmptyOperator(task_id="end")
40
41 @task
42 def report_filepath(**context):
43 folder, filename, cols = get_params_helper(**context)
44 print(f"Filename: {folder}/{filename}")
45
46 @setup
47 def create_csv(**context):
48 folder, filename, cols = get_params_helper(**context)
49
50 if not os.path.exists(folder):
51 os.makedirs(folder)
52
53 with open(f"{folder}/{filename}", "w", newline="") as f:
54 writer = csv.writer(f)
55 writer.writerows([cols])
56
57 @setup
58 def fetch_data(**context):
59 bad_data = context["params"]["fetch_bad_data"]
60
61 if bad_data:
62 return [
63 [1, "Joe", "Forty"],
64 [2, "Tom", 29],
65 [3, "Lea", 19],
66 ]
67 else:
68 return [
69 [1, "Joe", 40],
70 [2, "Tom", 29],
71 [3, "Lea", 19],
72 ]
73
74 @setup
75 def write_to_csv(data, **context):
76 folder, filename, cols = get_params_helper(**context)
77
78 with open(f"{folder}/{filename}", "a", newline="") as f:
79 writer = csv.writer(f)
80 writer.writerows(data)
81
82 time.sleep(10)
83
84 @task
85 def get_average_age(**context):
86 folder, filename, cols = get_params_helper(**context)
87
88 with open(f"{folder}/{filename}", "r", newline="") as f:
89 reader = csv.reader(f)
90 next(reader)
91 ages = [int(row[2]) for row in reader]
92
93 return sum(ages) / len(ages)
94
95 @teardown
96 def delete_csv(**context):
97 folder, filename, cols = get_params_helper(**context)
98
99 os.remove(f"{folder}/{filename}")
100
101 if not os.listdir(f"{folder}"):
102 os.rmdir(f"{folder}")
103
104 start >> report_filepath() >> end
105
106 create_csv_obj = create_csv()
107 fetch_data_obj = fetch_data()
108 write_to_csv_obj = write_to_csv(fetch_data_obj)
109 get_average_age_obj = get_average_age()
110 delete_csv_obj = delete_csv()
111
112 chain(
113 start,
114 create_csv_obj,
115 write_to_csv_obj,
116 get_average_age_obj,
117 delete_csv_obj,
118 end,
119 )
120
121 # when using @setup and @teardown the tasks can be linked using normal dependency syntax
122 # or by leveraging task flow (see the complex example)
123 create_csv_obj >> delete_csv_obj
124 fetch_data_obj >> delete_csv_obj
125 write_to_csv_obj >> delete_csv_obj
126
127
128setup_teardown_csv_decorators()

Setup/ teardown example DAG