Airflow task groups

Airflow task groups are a tool to organize tasks into groups within your DAGs. Using task groups allows you to:

  • Organize complicated DAGs, visually grouping tasks that belong together in the Airflow UI Grid View.
  • Apply default_args to sets of tasks, instead of at the DAG level using DAG parameters.
  • Dynamically map over groups of tasks, enabling complex dynamic patterns.
  • Turn task patterns into modules that can be reused across DAGs or Airflow instances.

In this guide, you’ll learn how to create and use task groups in your DAGs. You can find many example DAGs using task groups on the Astronomer GitHub.

Task group intro gif

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

When to use task groups

Task groups are most often used to visually organize complicated DAGs. For example, you might use task groups:

  • In big ELT/ETL DAGs, where you have a task group per table or schema.
  • In MLOps DAGs, where you have a task group per model being trained.
  • In DAGs owned by several teams, where you have task groups to visually separate the tasks that belong to each team. Although in this case, it might be better to separate the DAG into multiple DAGs and use Assets to connect them.
  • When you are using the same patterns of tasks in multiple DAGs and want to create a reusable module.
  • When you have an input of unknown length, for example an unknown number of files in a directory. You can use task groups to dynamically map over the input and create a task group performing sets of actions for each file. This is the only way to dynamically map sequential tasks in Airflow.

Define task groups

There are two ways to define task groups in your DAGs:

  • Use the TaskGroup class to create a task group context.
  • Use the @task_group decorator on a Python function.

In most cases, it is a matter of personal preference which method you use. The only exception is when you want to dynamically map over a task group; this is possible only when using @task_group.

The following code shows how to instantiate a simple task group containing two sequential tasks. You can use dependency operators (<< and >>) both within and between task groups in the same way that you can with individual tasks.

Decorator
1# from airflow.decorators import task_group
2
3t0 = EmptyOperator(task_id='start')
4
5# Start task group definition
6@task_group(group_id='my_task_group')
7def tg1():
8 t1 = EmptyOperator(task_id='task_1')
9 t2 = EmptyOperator(task_id='task_2')
10
11 t1 >> t2
12# End task group definition
13
14t3 = EmptyOperator(task_id='end')
15
16# Set task group's (tg1) dependencies
17t0 >> tg1() >> t3
Context
1# from airflow.utils.task_group import TaskGroup
2
3t0 = EmptyOperator(task_id='start')
4
5# Start task group definition
6with TaskGroup(group_id='my_task_group') as tg1:
7 t1 = EmptyOperator(task_id='task_1')
8 t2 = EmptyOperator(task_id='task_2')
9
10 t1 >> t2
11# End task group definition
12
13t3 = EmptyOperator(task_id='end')
14
15# Set task group's (tg1) dependencies
16t0 >> tg1 >> t3

Task groups are shown in both the Grid and the Graph of your DAG:

Task groups simple example - grid

Task groups simple example - graph

Task group parameters

You can use parameters to customize individual task groups. The two most important parameters are the group_id which determines the name of your task group, as well as the default_args which will be passed to all tasks in the task group. The following examples show task groups with some commonly configured parameters:

Decorator
1@task_group(
2 group_id="task_group_1",
3 default_args={"conn_id": "postgres_default"},
4 tooltip="This task group is very important!",
5 prefix_group_id=True,
6 # parent_group=None,
7 # dag=None,
8)
9def tg1():
10 t1 = EmptyOperator(task_id="t1")
11
12tg1()
Context
1with TaskGroup(
2 group_id="task_group_2",
3 default_args={"conn_id": "postgres_default"},
4 tooltip="This task group is also very important!",
5 prefix_group_id=True,
6 # parent_group=None,
7 # dag=None,
8 # add_suffix_on_collision=True, # resolves group_id collisions by adding a suffix
9) as tg2:
10 t1 = EmptyOperator(task_id="t1")

task_id in task groups

When your task is within a task group, your callable task_id will be group_id.task_id. This ensures the task_id is unique across the DAG. It is important that you use this format when referring to specific tasks when working with XComs or branching. You can disable this behavior by setting the task group parameter prefix_group_id=False.

For example, the task_1 task in the following DAG has a task_id of my_outer_task_group.my_inner_task_group.task_1.

Decorator
1@task_group(group_id="my_outer_task_group")
2def my_outer_task_group():
3 @task_group(group_id="my_inner_task_group")
4 def my_inner_task_group():
5 EmptyOperator(task_id="task_1")
6
7 my_inner_task_group()
8
9my_outer_task_group()
Context
1with TaskGroup(group_id="my_outer_task_group") as tg1:
2 with TaskGroup(group_id="my_inner_task_group") as tg2:
3 EmptyOperator(task_id="task_1")

Passing data through task groups

When you use the @task_group decorator, you can pass data through the task group just like with regular @task decorators:

1from airflow.decorators import dag, task, task_group
2from pendulum import datetime
3import json
4
5
6@dag(start_date=datetime(2023, 8, 1), schedule=None, catchup=False)
7def task_group_example():
8 @task
9 def extract_data():
10 data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
11 order_data_dict = json.loads(data_string)
12 return order_data_dict
13
14 @task
15 def transform_sum(order_data_dict: dict):
16 total_order_value = 0
17 for value in order_data_dict.values():
18 total_order_value += value
19
20 return {"total_order_value": total_order_value}
21
22 @task
23 def transform_avg(order_data_dict: dict):
24 total_order_value = 0
25 for value in order_data_dict.values():
26 total_order_value += value
27 avg_order_value = total_order_value / len(order_data_dict)
28
29 return {"avg_order_value": avg_order_value}
30
31 @task_group
32 def transform_values(order_data_dict):
33 return {
34 "avg": transform_avg(order_data_dict),
35 "total": transform_sum(order_data_dict),
36 }
37
38 @task
39 def load(order_values: dict):
40 print(
41 f"""Total order value is: {order_values['total']['total_order_value']:.2f}
42 and average order value is: {order_values['avg']['avg_order_value']:.2f}"""
43 )
44
45 load(transform_values(extract_data()))
46
47
48task_group_example()

The resulting DAG is shown in the following image:

Decorated task group

There are a few things to consider when passing information into and out of task groups:

  • If downstream tasks require the output of tasks that are in the task group decorator, then the task group function must return a result. In the previous example, a dictionary with two values was returned, one from each of the tasks in the task group, that are then passed to the downstream load() task.
  • If your task group function returns an output that another task takes as an input, Airflow can infer the task group and task dependency with the TaskFlow API. If your task group function’s output isn’t used as a task input, you must use the bit-shift operators (<< or >>) to define downstream dependencies to the task group.

Generate task groups dynamically at runtime

You can use dynamic task mapping with the @task_group decorator to dynamically map over task groups. The following DAG shows how you can dynamically map over a task group with different inputs for a given parameter.

1from airflow.decorators import dag, task_group, task
2from pendulum import datetime
3
4
5@dag(
6 start_date=datetime(2022, 12, 1),
7 schedule=None,
8 catchup=False,
9)
10def task_group_mapping_example():
11 # creating a task group using the decorator with the dynamic input my_num
12 @task_group(group_id="group1")
13 def tg1(my_num):
14 @task
15 def print_num(num):
16 return num
17
18 @task
19 def add_42(num):
20 return num + 42
21
22 print_num(my_num) >> add_42(my_num)
23
24 # a downstream task to print out resulting XComs
25 @task
26 def pull_xcom(**context):
27 pulled_xcom = context["ti"].xcom_pull(
28 # reference a task in a task group with task_group_id.task_id
29 task_ids=["group1.add_42"],
30 # only pull Xcom from specific mapped task group instances (2.5 feature)
31 map_indexes=[2, 3],
32 key="return_value",
33 )
34
35 # will print out a list of results from map index 2 and 3 of the add_42 task
36 print(pulled_xcom)
37
38 # creating 6 mapped task group instances of the task group group1 (2.5 feature)
39 tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])
40
41 # setting dependencies
42 tg1_object >> pull_xcom()
43
44
45task_group_mapping_example()

This DAG dynamically maps over the task group group1 with different inputs for the my_num parameter. 6 mapped task group instances are created, one for each input. Within each mapped task group instance two tasks will run using that instances’ value for my_num as an input. The pull_xcom() task downstream of the dynamically mapped task group shows how to access a specific XCom value from a list of mapped task group instances (map_indexes).

For more information on dynamic task mapping, including how to map over multiple parameters, see Dynamic Tasks.

Order task groups

By default, using a loop to generate your task groups will put them in parallel. If your task groups are dependent on elements of another task group, you’ll want to run them sequentially. For example, when loading tables with foreign keys, your primary table records need to exist before you can load your foreign table.

In the following example, the third task group generated in the loop has a foreign key constraint on both previously generated task groups (first and second iteration of the loop), so you’ll want to process it last. To do this, you’ll create an empty list and append your task group objects as they are generated. Using this list, you can reference the task groups and define their dependencies to each other:

Taskflow
1groups = []
2for g_id in range(1,4):
3 tg_id = f"group{g_id}"
4
5 @task_group(group_id=tg_id)
6 def tg1():
7 t1 = EmptyOperator(task_id="task1")
8 t2 = EmptyOperator(task_id="task2")
9
10 t1 >> t2
11
12 if tg_id == "group1":
13 t3 = EmptyOperator(task_id="task3")
14 t1 >> t3
15
16 groups.append(tg1())
17
18[groups[0] , groups[1]] >> groups[2]
Traditional
1groups = []
2for g_id in range(1,4):
3 tg_id = f"group{g_id}"
4 with TaskGroup(group_id=tg_id) as tg1:
5 t1 = EmptyOperator(task_id="task1")
6 t2 = EmptyOperator(task_id="task2")
7
8 t1 >> t2
9
10 if tg_id == "group1":
11 t3 = EmptyOperator(task_id="task3")
12 t1 >> t3
13
14 groups.append(tg1)
15
16[groups[0] , groups[1]] >> groups[2]

The following image shows how these task groups appear in the Airflow UI:

Task group Dependencies

This example also shows how to add an additional task to group1 based on your group_id, Even when you’re creating task groups in a loop to take advantage of patterns, you can still introduce variations to the pattern while avoiding code redundancies.

Nest task groups

For additional complexity, you can nest task groups by defining a task group indented within another task group. There is no limit to how many levels of nesting you can have.

Taskflow
1groups = []
2for g_id in range(1,3):
3 @task_group(group_id=f"group{g_id}")
4 def tg1():
5 t1 = EmptyOperator(task_id="task1")
6 t2 = EmptyOperator(task_id="task2")
7
8 sub_groups = []
9 for s_id in range(1,3):
10 @task_group(group_id=f"sub_group{s_id}")
11 def tg2():
12 st1 = EmptyOperator(task_id="task1")
13 st2 = EmptyOperator(task_id="task2")
14
15 st1 >> st2
16 sub_groups.append(tg2())
17
18 t1 >> sub_groups >> t2
19 groups.append(tg1())
20
21groups[0] >> groups[1]
Traditional
1groups = []
2for g_id in range(1,3):
3 with TaskGroup(group_id=f"group{g_id}") as tg1:
4 t1 = EmptyOperator(task_id="task1")
5 t2 = EmptyOperator(task_id="task2")
6
7 sub_groups = []
8 for s_id in range(1,3):
9 with TaskGroup(group_id=f"sub_group{s_id}") as tg2:
10 st1 = EmptyOperator(task_id="task1")
11 st2 = EmptyOperator(task_id="task2")
12
13 st1 >> st2
14 sub_groups.append(tg2)
15
16 t1 >> sub_groups >> t2
17 groups.append(tg1)
18
19groups[0] >> groups[1]

The following image shows the expanded view of the nested task groups in the Airflow UI:

Nested task groups

Custom task group classes

If you use the same patterns of tasks in several DAGs or Airflow instances, it may be useful to create a custom task group class module. To do so, you need to inherit from the TaskGroup class and then define your tasks within that custom class. You also need to use self to assign the task to the task group. Other than that, the task definitions will be the same as if you were defining them in a DAG file.

1from airflow.utils.task_group import TaskGroup
2from airflow.decorators import task
3
4
5class MyCustomMathTaskGroup(TaskGroup):
6 """A task group summing two numbers and multiplying the result with 23."""
7
8 # defining defaults of input arguments num1 and num2
9 def __init__(self, group_id, num1=0, num2=0, tooltip="Math!", **kwargs):
10 """Instantiate a MyCustomMathTaskGroup."""
11 super().__init__(group_id=group_id, tooltip=tooltip, **kwargs)
12
13 # assign the task to the task group by using `self`
14 @task(task_group=self)
15 def task_1(num1, num2):
16 """Adds two numbers."""
17 return num1 + num2
18
19 @task(task_group=self)
20 def task_2(num):
21 """Multiplies a number by 23."""
22 return num * 23
23
24 # define dependencies
25 task_2(task_1(num1, num2))

In the DAG, you import your custom TaskGroup class and instantiate it with the values for your custom arguments:

1from airflow.decorators import dag, task
2from pendulum import datetime
3from include.custom_task_group import MyCustomMathTaskGroup
4
5
6@dag(
7 start_date=datetime(2023, 8, 1),
8 schedule=None,
9 catchup=False,
10 tags=["@task_group", "task_group"],
11)
12def custom_tg():
13 @task
14 def get_num_1():
15 return 5
16
17 tg1 = MyCustomMathTaskGroup(group_id="my_task_group", num1=get_num_1(), num2=19)
18
19 @task
20 def downstream_task():
21 return "hello"
22
23 tg1 >> downstream_task()
24
25
26custom_tg()

The resulting image shows the custom templated task group which can now be reused in other DAGs with different inputs for num1 and num2.

Custom task group