Human-in-the-loop workflows with Airflow

Human-in-the-loop (HITL) workflows are processes that require human intervention, for example, to approve or reject an AI generated output, or choose a branch in a dag depending on the result of an upstream task. The Airflow standard provider contains a set of operators to create tasks that will wait for human input, either in the Airflow UI or through the Airflow REST API. Note that you need to be on Airflow 3.1+ to use the human-in-the-loop operators.

This guide covers the available HITL operators, as well as how to interact with them in the UI and via the API.

Gif showing a human-in-the-loop workflow in the Airflow UI.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

When to use human-in-the-loop workflows

Human-in-the-loop workflows are useful whenever you need the input from a human (or another entitiy outside of Airflow) within a dag. For example:

  • Your dag uses AI to create responses to a support ticket, and you want to ask a human to review the response and approve it or request changes.
  • Your dag generates a compliance report and you need a human to verify and acknowledge the results.
  • You’d like to use AI to route product feature requests to the appropritate team and need a human to decide where to send edge cases.
  • You have a dag that requires input from a domain expert, for example to input feedback gathered in user research interviews.

Human-in-the-loop workflows are very common, especially with increased usage of AI to generate assets that are in need of human review. The Airflow HITL features allows you to build these workflows and have non-technical team members provide their input either in the Airflow UI or via an implementation built around the relevant Airflow REST API endpoints.

Required Actions

Each task instance created by a human-in-the-loop operator creates a Required Action object. You can view a list of all required actions, pending and resolved, for your whole Airflow instance under Browse > Required Actions in the Airflow UI.

Screenshot of the Airflow UI showing the list of Required Actions under Browse -> Required Actions.

To respond to a required action you can either:

  • Navigate to the task instance page’s Required Actions tab and respond directly in the UI.
  • Make a call to the Airflow REST API’s update hitl detail endpoint.

Human-in-the-loop operators

There are 4 human-in-the-loop operators available in the Airflow standard provider:

  • HITLOperator: The base class for all human-in-the-loop operators.
  • ApprovalOperator: An specialised form of the HITLOperator where the two options are “Approve” and “Reject”.
  • HITLBranchOperator: An specialised form of the HITLOperator where the user input provides a branching decision, choosing the (set of) task(s) to run next.
  • HITLEntryOperator: An specialised form of the HITLOperator where the user provides input to a form.

All human-in-the-loop operators are implemented as deferrable operators, meaning that they will release their worker slot and run an asychronous process in the Triggerer component while waiting for the human input.

When choosing which human-in-the-loop operator to use, consider the following:

  • If the human-in-the-loop action centers around a branching decision (choosing the next task(s) to run), use the HITLBranchOperator.
  • If you are looking to make a binary decision (approval or rejection of information displayed at runtime), use the ApprovalOperator. If users need to give additional input add a form field using params.
  • If you are mostly interested in letting users provide input to a form, use the HITLEntryOperator.
  • For all other use cases, you can use the HITLOperator directly. It allows you to display custom options for your user to chose from, as well as additional input fields with params.

In a lot of cases you’ll likely be using a combination of these operators to build your human-in-the-loop workflow, for example using the HITLBranchOperator upstream to choose whether to accept an AI generated ticket response or escalate the ticket to a human. Then, downstream, the human’s answer can be provided in a HITLEntryOperator. Both, the user decision (chosen_options) and any input to parameters (params_input) are available for downstream tasks to use by pulling the information from XComs.

HITLOperator

The HITLOperator is the base class for all human-in-the-loop operators. It is the most versatile operator in this operator family. With it you can display information, let the user choose one or more from a list of options and accept additional input via a form based on Airflow params.

Two parameters are mandatory when instantiating the HITLOperator:

  • subject (required): The subject of the templated action which is displayed as the title in the Required Actions tab. This field is templatable, which means you can use Jinja templates to render information at runtime, including information computed by an upstream task.
  • options (required): A list of strings that are rendered as response options at the bottom of the required action form.

There are also several optional parameters that you can use to further configure the behavior of the HITLOperator:

  • body: The main text body. This field is templatable as well and supports markdown formatting.
  • defaults: Optionally, you can provide a list of one or more options that are selected by default if the task times out before a human responds. All default options need to be in the options list.
  • multiple: If set to True, the user can select multiple options. Default is False.
  • params: With this parameters you can create form fields for any user input to the required action using Airflow params. Note that not all param functionality is supported for human-in-the-loop operators.
  • execution_timeout: This is a BaseOperator parameter that times out the task after a specified duration provided as a datetime.timedelta or pendulumduration object. Default is None. After the timeout has been reached the behavior depends on whether you provided a defaults list or not:
    • If you provided a defaults list, the default(s) is/are chosen as the response and the task succeeds.
    • If you did not provide a defaults list, the task fails.
  • assigned_users: A list of all users who are allowed to respond to the required action. Users are provided as HITLUser objects (from airflow.sdk.execution_time.hitl import HITLUser) with an id and name field.
    • If you are running Airflow on Astro, the id of each user is their Astro ID in the format cl1a2b3cd456789ef1gh2ijkl3. You can find each user’s Astro ID under Organization -> Access Management.
    • If you are using the SimpleAuthManager the id of each user is their username.
    • If you are using the FabAuthManager the id of each user is their email.
  • notifiers: A list of notifiers of which to execute the .notify() method when the task starts running. See Use notifiers with HITL operators for an example.

The following example shows a simple use of the HITLOperator where the output of an upstream task is fetched from XComs and the user is given 3 response options to choose from. One additional input field for the expense_amount is rendered in the Airflow UI using an Airflow param. After 5 minutes (execution_timeout) the task times out and chooses the default option: ACH Transfer as the response and 10000 as the expense amount.

The downstream print_result task prints out the information pushed to XComs by the HITLOperator task: the chosen_options and the params_input values.

1from airflow.providers.standard.operators.hitl import HITLOperator
2from airflow.sdk import dag, task, chain, Param
3
4
5@dag
6def HITLOperator_syntax_example():
7
8 @task
9 def upstream_task():
10 return "Review expense report and approve vendor payment method."
11
12 _upstream_task = upstream_task()
13
14 _hitl_task = HITLOperator(
15 task_id="hitl_task",
16 subject="Expense Approval Required", # templatable
17 body="{{ ti.xcom_pull(task_ids='upstream_task') }}", # templatable
18 options=["ACH Transfer", "Wire Transfer", "Corporate Check"],
19 defaults=["ACH Transfer"],
20 multiple=False, # default: False
21 params={
22 "expense_amount": Param(
23 10000,
24 type="number",
25 )
26 },
27 execution_timeout=timedelta(minutes=5), # default: None
28 )
29
30 @task
31 def print_result(hitl_output):
32 print(f"Expense amount: ${hitl_output['params_input']['expense_amount']}")
33 print(f"Payment method: {hitl_output['chosen_options']}")
34
35 _print_result = print_result(_hitl_task.output)
36
37 chain(_upstream_task, _hitl_task)
38
39
40HITLOperator_syntax_example()

The open required action form is displayed on the task instance page’s Required Actions tab.

Screenshot of the Airflow UI showing the HITLOperator task instance page with the Required Actions tab open.

ApprovalOperator

The ApprovalOperator is a specialised form of the HITLOperator where the only two possible response options are Approve and Reject. Additionally, you can provide param form fields for any user input to the required action using Airflow params.

If the human chooses to Approve the task succeeds. If the human chooses to Reject the task succeeds but all downstream tasks are skipped.

1from airflow.providers.standard.operators.hitl import ApprovalOperator
2from airflow.sdk import dag, task, chain, Param
3
4
5@dag
6def ApprovalOperator_syntax_example():
7
8 @task
9 def upstream_task():
10 return "Pineapple on pizza?"
11
12 _upstream_task = upstream_task()
13
14 _hitl_task = ApprovalOperator(
15 task_id="approval_task",
16 subject="Your task:",
17 body="{{ ti.xcom_pull(task_ids='upstream_task') }}",
18 defaults="Approve", # other option: "Reject"
19 params={
20 "second_topping": Param(
21 "olives",
22 type="string",
23 )
24 },
25 )
26
27 @task
28 def print_result(hitl_output):
29 print(f"Params input: {hitl_output['params_input']}")
30 print(f"Chosen options: {hitl_output['chosen_options']}")
31
32 _print_result = print_result(_hitl_task.output)
33
34 chain(_upstream_task, _hitl_task)
35
36
37ApprovalOperator_syntax_example()

The action form shows the two options Approve and Reject alongside any param form fields.

Screenshot of the Airflow UI showing the ApprovalOperator task instance page with the Required Actions tab open.

HITLBranchOperator

If you want to branch your dag based on the human input, you can use the HITLBranchOperator. This operator allows the user to choose one or more tasks that are directly downstream of the HITLBranchOperator task to run next. All tasks that are not chosen will be skipped.

You can use the options_mapping parameter to map the human facing options to the task IDs of the tasks that are downstream of the HITLBranchOperator task.

1from airflow.providers.standard.operators.hitl import HITLBranchOperator
2from airflow.sdk import dag, task, chain
3
4
5_budget_categories = ["marketing", "research_development", "facilities", "training", "technology"]
6
7
8@dag
9def HITLBranchOperator_syntax_example():
10
11 @task
12 def upstream_task():
13 return {
14 "total_budget": "$4B",
15 }
16
17 _upstream_task = upstream_task()
18
19 _hitl_branch_task = HITLBranchOperator(
20 task_id="hitl_branch_task",
21 subject="Budget Category Approval",
22 body="""**Total Budget Available:** {{ ti.xcom_pull(task_ids='upstream_task')['total_budget'] }}
23
24Select the funding proposals to approve for this quarter.""",
25 options=_budget_categories,
26 defaults=["marketing", "research_development"],
27 multiple=True,
28 )
29
30 for _category in _budget_categories:
31
32 @task(
33 task_id=f"{_category}", # needs to match options in HITLBranchOperator
34 )
35 def category_task():
36 print(f"Processing budget approval for {_category}")
37
38 _category_task = category_task()
39 chain(_hitl_branch_task, _category_task)
40
41 chain(_upstream_task, _hitl_branch_task)
42
43
44HITLBranchOperator_syntax_example()

The screenshot below shows the graph view created by the code snippet above with 5 tasks downstream of the HITLBranchOperator task and the Required Actions tab showing the form input.

Screenshot of the Airflow UI showing the HITLBranchOperator task instance page with the Required Actions tab open.

After approving the budget for 3 of the categories the dag completes with 3 downstream tasks being run and 2 being skipped.

Screenshot of the Airflow UI showing the HITLBranchOperator task instance page with the Required Actions tab open.

HITLEntryOperator

The HITLEntryOperator is a specialised form of the HITLOperator where the user provides input to a form and then submits the input without choosing from a list of options.

1from airflow.providers.standard.operators.hitl import HITLEntryOperator
2from airflow.sdk import dag, task, chain, Param
3
4
5@dag
6def HITLEntryOperator_syntax_example():
7
8 @task
9 def upstream_task():
10 return "How can I auto-pause a dag if it fails?"
11
12 _upstream_task = upstream_task()
13
14 _hitl_task = HITLEntryOperator(
15 task_id="hitl_task",
16 subject="Please respond to this ticket!",
17 body="{{ ti.xcom_pull(task_ids='upstream_task') }}",
18 params={
19 "response": Param(
20 "You can use the max_consecutive_failed_dag_runs parameter! :)",
21 type="string",
22 ),
23 "urgency": Param(
24 "p3",
25 type="string",
26 ),
27 },
28 )
29
30 @task
31 def print_result(hitl_output):
32 print(f"Params input: {hitl_output['params_input']}")
33 print(f"Chosen options: {hitl_output['chosen_options']}")
34
35 _print_result = print_result(_hitl_task.output)
36
37 chain(_upstream_task, _hitl_task)
38
39
40HITLEntryOperator_syntax_example()

Screenshot of the Airflow UI showing the HITLEntryOperator task instance page with the Required Actions tab open.

Use notifiers with HITL operators

You can use an Airflow notifier to send information from the human-in-the-loop operator to another system, such as Slack or email. The .notify() method of the notifier is executed when the task starts running. A simple implementation is to use the HITLOperator.generate_link_to_ui_from_context method to return a link to the required action in the Airflow UI for users to click on to respond.

The code snippet below shows a sample notifier MyNotifier that prints the required action information and the link to the required action to the Airflow logs.

1from airflow.sdk import BaseNotifier, Context, dag, task, Param
2from airflow.providers.standard.operators.hitl import HITLOperator
3from datetime import timedelta
4
5_BASE_URL = "http://localhost:28080"
6
7class MyNotifier(BaseNotifier):
8 template_fields = ("message",)
9
10 def __init__(self, message: str = "") -> None:
11 self.message = message
12
13 def notify(self, context: Context) -> None:
14
15 task_state = context['ti'].state
16 if task_state == "running":
17
18 # this method generates a direct link to the UI page where the user can respond
19 url = HITLOperator.generate_link_to_ui_from_context(
20 context=context,
21 base_url=_BASE_URL,
22 )
23
24 # placeholder code, you can send the URL to any service you want
25 self.log.info(self.message)
26 self.log.info("Url to respond %s", url)
27 else:
28 self.log.info("Task state: %s", task_state)
29 self.log.info("No response needed!")
30
31
32notifier_class = MyNotifier(
33 message="""
34Subject: {{ task.subject }}
35Body: {{ task.body }}
36Options: {{ task.options }}
37"""
38)
39
40
41@dag
42def notifier_example():
43 HITLOperator(
44 task_id="hitl_task",
45 subject="Choose a number: ",
46 options=["23", "19", "42"],
47 notifiers=[notifier_class],
48 )
49
50
51notifier_example()

Of course you can also add all regular callback functions such as on_failure_callback, on_success_callback, etc. to the human-in-the-loop operators.

Human-in-the-loop API endpoints

If your human (or other entity) does not have access to the Airflow UI, you can use the Airflow REST API to poll for required actions and respond to them.

The relevant endpoints are:

  • GET api/v2/hitlDetails/ to get a list of required actions in an Airflow instance filtered by flags like state, dag_id and more.
  • GET api/v2/hitlDetails/{dag_id}/{dag_run_id}/{task_id} to get the details of a specific required action.
  • PATCH api/v2/hitlDetails/{dag_id}/{dag_run_id}/{task_id} to respond to a specific required action.

These API calls can be combined with others to create scripts like the one below that allows you to respond to all pending required actions in a specific dag from the command line.

1import requests
2from typing import Any
3
4_USERNAME = "admin"
5_PASSWORD = "admin"
6_HOST = "http://localhost:28080/" # To learn how to send API requests to Airflow running on Astro see: https://www.astronomer.io/docs/astro/airflow-api/
7
8_DAG_ID = "HITLOperator_syntax_example"
9_TASK_ID = "hitl_task"
10
11
12def _pick_option(options: list[str]):
13 print("Available options: ", options)
14 chosen_option = input("Enter the option you want to select: ")
15 return chosen_option
16
17
18def _pick_params(param: dict[str, Any]):
19 print("Input for param:", param)
20 param_input = input("Enter your value for the param: ")
21 return param_input
22
23
24def _get_jwt_token():
25 token_url = f"{_HOST}/auth/token"
26 payload = {"username": _USERNAME, "password": _PASSWORD}
27 headers = {"Content-Type": "application/json"}
28 response = requests.post(token_url, json=payload, headers=headers)
29
30 token = response.json().get("access_token")
31 return token
32
33
34def _get_running_dagruns_for_dag(dag_id: str):
35 url = f"{_HOST}/api/v2/dags/{dag_id}/dagRuns?state=running"
36 headers = {"Authorization": f"Bearer {_get_jwt_token()}"}
37 response = requests.get(url, headers=headers)
38 return response.json()
39
40
41def _get_hitl_details(dag_id: str, dag_run_id: str, task_id: str):
42 url = f"{_HOST}/api/v2/hitlDetails/{dag_id}/{dag_run_id}/{task_id}"
43 headers = {"Authorization": f"Bearer {_get_jwt_token()}"}
44 response = requests.get(url, headers=headers)
45 if response.status_code == 200:
46 subject = response.json()["subject"]
47 body = response.json()["body"]
48 options = response.json()["options"]
49 params = response.json()["params"]
50 print("--------------------------------")
51 print("Required Action found for: ", dag_id, "DAG Run: ", dag_run_id, "Task: ", task_id)
52 print("Subject: ", subject)
53 print("Body: ", body)
54 print("Options: ", options)
55 print("Params: ", params)
56 print("--------------------------------")
57 return {
58 "subject": subject,
59 "body": body,
60 "options": options,
61 "params": params,
62 }
63 elif response.status_code == 404:
64 print("--------------------------------")
65 print("404 - No required action found for: ", dag_id, "DAG Run: ", dag_run_id, "Task: ", task_id)
66 print("Response: ", response.json())
67 print("--------------------------------")
68 return None
69 else:
70 print("--------------------------------")
71 print("Error: ", response.status_code)
72 print("Response: ", response.json())
73 print("--------------------------------")
74 return None
75
76
77def _add_hitl_response(
78 dag_id: str, dag_run_id: str, task_id: str, options: list[str], params: dict[str, Any]
79):
80 url = f"{_HOST}/api/v2/hitlDetails/{dag_id}/{dag_run_id}/{task_id}"
81 headers = {"Authorization": f"Bearer {_get_jwt_token()}"}
82 chosen_options = [_pick_option(options)]
83 if params:
84 params_input = {f"{param}": _pick_params(param) for param in params}
85 else:
86 params_input = {}
87 response = requests.patch(
88 url, headers=headers, json={"chosen_options": chosen_options, "params_input": params_input}
89 )
90 if response.status_code == 200:
91 print("--------------------------------")
92 print("Hitl response added for DAG: ", dag_id, "DAG Run: ", dag_run_id, "Task: ", task_id)
93 print("Chosen options: ", chosen_options)
94 print("Params input: ", params_input)
95 print("Response status code: ", response.status_code)
96 print("Response: ", response.json())
97 print("--------------------------------")
98 elif response.status_code == 409:
99 print("--------------------------------")
100 print("409 - Already updated action for: ", dag_id, "DAG Run: ", dag_run_id, "Task: ", task_id)
101 print("Response: ", response.json())
102 print("--------------------------------")
103 else:
104 print("--------------------------------")
105 print("Error: ", response.status_code)
106 print("Response: ", response.json())
107 print("--------------------------------")
108
109
110def main():
111 dag_runs = _get_running_dagruns_for_dag(_DAG_ID)["dag_runs"]
112 if not dag_runs:
113 print("No running dag runs found for DAG: ", _DAG_ID)
114 else:
115 for dag_run in dag_runs:
116 hitl_details = _get_hitl_details(_DAG_ID, dag_run["dag_run_id"], _TASK_ID)
117 if hitl_details:
118 _add_hitl_response(
119 _DAG_ID, dag_run["dag_run_id"], _TASK_ID, hitl_details["options"], hitl_details["params"]
120 )
121
122
123if __name__ == "__main__":
124 main()