Blog |

How to build a decision tracing context graph with Apache Airflow®

9 min read |

2023 was the year of the LLM.

2024 was the year of the Agent.

2025 was the year of the Human-in-the-loop.

You’ve done it all. Integrated LLMs into your workflows, made lobster agents run 24/7, implemented human approval steps before sending that AI generated compliance report to the authorities (hopefully). What is the next step?

The phrase of 2026 has already been decided: decision tracing context graph. The full, end-to-end record of not just results and outputs, but the decisions, actions, and reasons that went into creating that 5 star customer support response, top-selling product, differentiating feature, and disruptive new strategy.

Your Agents can only get smarter if they understand how decisions in your organization are made. This is why decision traces have become “the single most valuable asset for companies in the era of AI”.

But what does a decision tracing context graph look like in practice? This blog walks through a simple but powerful example of a Dag that uses AI to answer support tickets and a human to decide if the answer needs adjustments using a Slack human-in-the-loop Airflow plugin. As soon as the decision is made, the full decision context itself is gathered using Airflow, available to the AI agent in the future.

Today’s business decisions are tomorrow’s AI context.

If you are in a hurry you can find the full code for decision tracing pipeline including the Slack HITL plugin in this GitHub repository.

Why use Airflow to build a context graph?

The context going into a decision sometimes appears almost ephemeral, being spread across tools, humans working and communicating in different applications, and agents running in the cloud, on-prem, and on edge devices. The full picture, full decision trace, can only be gathered by a platform that touches everything.

Airflow is the industry standard for workflow orchestration precisely because it can connect to anything that has an API, run any task you can define in code, and knows your data ecosystem inside-out.

You’ve already been building your context graph with every Airflow Dag you write. Your Dag and task-dependencies are a living record of your organization’s inner workings, all are nodes in your context graph.

The missing piece is capturing why a human approved something, rejected it, or asked for changes, and feeding that context back to the AI Agent.

Step 1: Define the decision points

Airflow is the source of truth and Dags form the context graph. With the Airflow 3.1 human-in-the-loop (HITL) feature you can track decisions natively with your Airflow Dags by adding a HITLOperator.

Once the Dag run reaches this operator the task will defer itself, meaning it runs an asynchronous process in the Triggerer component (not even taking up a worker slot) and waits for you, or any entity (human or agent) in your organization to choose an option and submit additional information in pre-defined fields.


Figure: A screenshot of the example Dag during a run having reached the review_ai_response task defined with the HITLOperator.

Let’s look at a concrete example!

Most organizations are running AI Agents to create first drafts of answers to support tickets. The ai_support_ticket_system Dag does exactly that, it fetches a pending support ticket and passes it to the customer_success_agent, orchestrated using the Airflow AI SDK.

   @task.agent(agent=customer_success_agent)
    def generate_ai_response(ticket: dict):
        import json

        ticket_str = json.dumps(ticket)
        return ticket_str

    _generate_ai_response = generate_ai_response(ticket=_fetch_pending_ticket)

The agent generates a response to the ticket. This is where a lot of organizations switch to a manual workflow, with a human going through and approving or denying the AI response in a 3rd party system, leading to a loss of the why behind a specific ticket being approved or denied.

In order to level up, you need to gather that context gold by adding a HITLOperator. This operator displays the original support ticket to your human in the loop, alongside the AI generated response.

The human can now choose any of 3 options: Approve the AI response, Respond Manually instead, or Escalate to the customer engineering team. Crucially, the human is asked to provide a reason for their decision. And this information is retained by Airflow and we’ll collect it in step 3!

_review_ai_response = HITLOperator(
    task_id="review_ai_response",
    subject="🎫 AI Support Response Ready for Review",
    body="""**Please review the AI-generated support ticket response below:**

{{ ti.xcom_pull(task_ids='format_approval_request')['ticket_info'] }}

**AI Summary:**
{{ ti.xcom_pull(task_ids='format_approval_request')['summary'] }}

....""",
    options=[
        "Approve AI Response",
        "Respond Manually",
        "Escalate to CRE",
    ],
    multiple=False,
    defaults=["Escalate to CRE"],
    params={"notes": Param(type=["string", "null"], default="...")},
    execution_timeout=timedelta(
        hours=4
    ),  # after 4 hours the default option will be selected
)

Why even have a human-in-the-loop? Well, it depends on how mission critical your use case is and the risks if there is a mistake. AI Agents can do a lot but it is not wise yet, to let them run fully unsupervised, as recent experiments have shown. Unless you are planning a Playstation 5 giveaway for your customers, you still likely want to double check AI outputs before they reach important systems or external stakeholders.

Step 2: Meet users where they are at

The human-in-the-loop feature comes with an intuitive interface, directly in the Airflow UI, which is great if you, the Dag author, are the human in the loop. But non-technical stakeholders might not have access to your Airflow environment or may struggle to navigate it. Also, no one wants to have to log into yet another UI and have yet another browser and mental tab open to deal with.

Luckily the HITL feature has full API support, which means when using Airflow to build your decision trace, you can go to where decisions are made, like emails, MS teams, or Slack. Don’t ask business users to leave their favorite tools.

For this example, I built a human-in-the-loop Fast API plugin for Slack. The end user does not need to know anything about Airflow or context graphs. They just get a friendly Slack message asking them for input.

Figure: Start of the Slack message sent by the Airflow plugin asking the customer support human to review the AI generated answer to the support request.

In this case John had a feature request: Adding dark mode to our app. The AI response was friendly but the agent missed that this feature was already on the roadmap. Time for a manual override by our human, including the reason for this decision: that they needed to check the roadmap first!

Every decision needs to be justified with a reason, this is our decision trace.

Figure: Slack response form showing the manual override of the AI answer including the reason for the decision.

As soon as the human submits their decision in Slack, the information is sent back to Airflow via an AWS lambda function and stored alongside the reasoning for the decision as HITL details. The Dag run continues and the manual response is either sent directly to the customer, or given to another agent to format before being sent.

You can find the full Slack HITL plugin in the companion GitHub repository here.

Step 3: Gather your decisions

As soon as the decision has been recorded in Airflow, a second Dag, gathering_decision_trace, starts running, ready to gather the decision trace. The core of this Dag is a task querying the hitlDetails endpoint of the Airflow REST API, giving access to the body containing the original AI response, as well as the chosen option and inputs from the human in Slack. You can find the full Dag here.

    @task
    def query_hitl_decisions(dag_id: str, dag_run_id: str) -> list:
        endpoint = f"/dags/{dag_id}/dagRuns/{dag_run_id}/hitlDetails"
        result = make_api_request(endpoint)

        hitl_details_list = []
        for hitl in result.get("hitl_details", []):
            task_instance = hitl.get("task_instance", {})
            hitl_details_list.append({
                "task_id": task_instance.get("task_id"),
                "map_index": task_instance.get("map_index"),
                "chosen_options": hitl.get("chosen_options", []),
                "params_input": hitl.get("params_input", {}),
                "responded_at": hitl.get("responded_at"),
                "responded_by_user": hitl.get("responded_by_user"),
                "response_received": hitl.get("response_received"),
                "subject": hitl.get("subject"),
                "body": hitl.get("body"),
                "options": hitl.get("options"),
            })

        print(f"Retrieved {len(hitl_details_list)} HITL decisions: {hitl_details_list}")
        return hitl_details_list

Thanks to Airflow touching all systems in this decision we get a full trace of what happened and why. This holds true even if we add more tools and humans into the decision graph; no matter how complex, we can gather a full record of the decision from Airflow.

The gathering_decision_trace Dag stores the decision trace as a markdown file, ready for the Agent to read.

Step 4: Profit! Your Agent is now smarter

Two days later: Another user has a feature request, for more granular cost attribution!

This time the agent is prepared! It knows from the previous human decision that it should check the roadmap for upcoming features. The generated response is now flawless.

This is possible because the agent has access to a tool called read_all_decision_traces that can access all previous decisions with a full explanation of the reasoning that went into them, as well as access to the product roadmap.

customer_success_agent = Agent(
    model="gpt-4o-mini",
    system_prompt="""
    You are friendly and helpful support agent generating answers to tickets.
    Make sure to address the customer by name in the response.

    Tools:
    - `read_all_decision_traces`: Read all decision traces for previous reviews of your responses.
    - `check_the_roadmap`: Check the roadmap for the requested feature.
    """,
    output_type=TicketResponse,
    tools=[read_all_decision_traces, check_the_roadmap],
)

Conclusion

This simple example shows a very powerful pattern that can create AI systems that automatically learn not just from outputs, but from the why that went into a decision. Checking the roadmap before responding to a feature request might be pretty straightforward, but this pattern can be used for much more complex decisions combining input from several humans and tools.

When you track the decision tracing context graph with Airflow, your lobsters will only get smarter over time!

If you want to learn more about human-in-the-loop operators and other features in Airflow 3 used for AI applications, check out the Orchestrate LLMs and Agents with Apache Airflow® ebook.

Build, run, & observe your data workflows.
All in one place.

Build, run, & observe
your data workflows.
All in one place.

Try Astro today and get up to $20 in free credits during your 14-day trial.