Synchronous dag execution

Synchronous dag execution refers to the ability in Airflow 3.1+ to trigger a dag run via an API call and wait for it to complete before returning XCom values pushed by one or more tasks in the dag run. This is useful both for single DAG runs and for cases where the same DAG may be triggered multiple times in parallel.

Synchronous dag execution was added as an experimental feature in Airflow 3.1.

Assumed knowledge

When to use synchronous dag execution

Synchronous dag execution is a way to use Airflow as the backend for services processing user requests coming from a frontend application like a website, mobile app, or slack bot. Common use cases include:

  • Inference execution: A user provides input to a pipeline that interacts with one or more LLMs and/or AI agents to generate a response. The response is served back to the user as soon as the dag has completed running.
  • Ad-hoc requests: Non-technical stakeholders request data analyses that use a dag to retrieve the desired result.
  • Data submission: Non-technical users can submit their data to a dag to be processed and get immediate feedback on the status of the request and the result.

API Endpoint

The endpoint to wait for a dag run to complete is:

GET api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait

It includes the following path parameters:

  • dag_id: (Mandatory) The id of the DAG to wait for.
  • dag_run_id: (Mandatory) The id of the DAG run to wait for.

The query parameters are:

  • interval: (Mandatory) Seconds to wait between dag run state checks.
  • result: (Optional) Array of strings or null. A list of task ids from which to pull the XCom value pushed under the return_value key.

Calling this endpoint on any running dag will start a waiting process until the dag run completes. If any XCom are requested in the result parameter, they are returned in the response upon dag run completion.

Example script

The following script creates a dag run for the my_dag dag and waits for it to complete. It includes XComs pushed under the return_value key of the my_task task in the response.

1import requests
2from datetime import datetime
3import json
4
5_USERNAME = "admin"
6_PASSWORD = "admin"
7_HOST = "http://localhost:8080/" # To learn how to send API requests to Airflow running on Astro see: https://www.astronomer.io/docs/astro/airflow-api/
8
9_DAG_ID = "my_dag"
10_TASK_ID = "my_task"
11
12
13def _get_jwt_token():
14 token_url = f"{_HOST}/auth/token"
15 payload = {"username": _USERNAME, "password": _PASSWORD}
16 headers = {"Content-Type": "application/json"}
17 response = requests.post(token_url, json=payload, headers=headers)
18
19 token = response.json().get("access_token")
20 return token
21
22
23def _trigger_dag_run(dag_id: str):
24 url = f"{_HOST}/api/v2/dags/{dag_id}/dagRuns"
25 headers = {
26 "Authorization": f"Bearer {_get_jwt_token()}",
27 "Content-Type": "application/json",
28 }
29 payload = {
30 "logical_date": None,
31 }
32 response = requests.post(url, headers=headers, json=payload)
33 return response.json()["dag_run_id"]
34
35
36def _wait_for_dag_run_completion(dag_id: str, dag_run_id: str):
37 url = f"{_HOST}/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait"
38 headers = {
39 "Authorization": f"Bearer {_get_jwt_token()}",
40 }
41 params = {
42 "interval": 1,
43 "result": [_TASK_ID],
44 }
45 response = requests.get(url, headers=headers, params=params)
46 print(f"Status Code: {response.status_code}")
47
48 lines = response.text.strip().split("\n")
49 json_objects = []
50
51 for line in lines:
52 if line.strip():
53 json_obj = json.loads(line)
54 json_objects.append(json_obj)
55 print(f"Status: {json_obj.get('state', 'unknown')}")
56
57 if json_objects:
58 last_status_update = json_objects[-1]
59 xcom_results = last_status_update.get("results", {})
60 print("Last status update: ", last_status_update)
61 print("XCom results: ", xcom_results)
62 return xcom_results
63
64
65if __name__ == "__main__":
66 _dag_run_id = _trigger_dag_run(_DAG_ID)
67 _wait_for_dag_run_completion(_DAG_ID, _dag_run_id)

Running the script above returns an output similar to the following:

Status Code: 200
Status: queued
Status: running
Status: running
Status: success
Last status update: {'state': 'success', 'results': {'my_task': 'Hello World!'}}
XCom results: {'my_task': 'Hello World!'}