Orchestrate AI tasks with Apache Airflow® and the Common AI provider

The Airflow Common AI provider is an Airflow provider package that contains several operators and other modules to add AI-based tasks to your Dags, from simple LLM calls to AI agents with access to Airflow-based tools. It is built on top of PydanticAI and can be used with any compatible model provider, including OpenAI, Anthropic, Gemini, AWS Bedrock, HuggingFace, and more.

In this guide you’ll learn:

  • Basic AI concepts to understand how to use the Common AI provider.
  • How to install the Common AI provider and connect Airflow to your model provider.
  • How to use the Common AI decorators and operators.
  • How to add toolsets, durable execution, and human-in-the-loop review to your agentic tasks.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

Concepts

The Common AI provider abstracts calls to LLMs (large language models) and LMMs (large multimodal models), often as an AI agent with tool access. The calls are made through PydanticAI, which provides a consistent interface for all compatible model providers.

ConceptDescription
LLMLarge language model. A model that takes text input and generates text output.
LMMLarge multimodal model. A model that takes input from multiple modalities (text, images, audio, video, and other inputs) and generates output in one or more modalities.
AI agentAn LLM or LMM that has access to a set of tools. Typically, agents perform multiple steps of output generation and tool calls to achieve a goal. The goal of an agent can range from providing output to performing complex tasks involving multiple systems.
ToolsFunctions that an AI agent can call. Agents typically use tools to interact with an MCP or API to perform an action in another system. For example, retrieving data from a database or writing a file to object storage.
MCPModel Context Protocol, a standard way for agents to connect to tools. You can think of an MCP server as an agent-callable interface for a tool or data source, often wrapping an API.

As soon as tools are added to an AI agent, the agent can independently call that tool and perform any action that is possible through the tool. If you are adding tools, especially custom tooling with access to production systems, make sure to scope the access AI agents have to prevent destructive actions like dropping a table or deleting a file. Never rely on instructions in a prompt to restrict your agent from performing actions, always enforce access control programmatically.

Keep in mind that LLMs and LMMs are not deterministic and therefore tasks using these models are not idempotent. This means you might get different results when rerunning or backfilling Dags that contain AI-based tasks, even if all inputs are the same.

Installing the Common AI provider

To use the Common AI provider, you need to be at least on Airflow 3.0 and install it by adding it to your requirements.txt file. Make sure to pin the latest version.

apache-airflow-providers-common-ai==<version>

Most operators in the Common AI provider depend on modules from the Airflow standard provider, which is pre-installed when using the Astro CLI. Additionally, you’ll need to install the Airflow Common SQL provider when using SQLToolset, @task.llm_sql, @task.llm_schema_compare, or other features that read database metadata through a DbApiHook.

apache-airflow-providers-standard==<version>
apache-airflow-providers-common-sql==<version>

Setting the PydanticAI connection

The Common AI provider uses the same operators and decorators across many model providers by interacting with them through PydanticAI. You can you switch providers by updating the Airflow connection.

The connection has the following format:

AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"host": "<your_host>",
"password": "<your_api_key>",
"extra": {
"model": "<your_provider>:<your_model>"
}
}'

You can set a default model for the connection by specifying model in the connection extra field in the format <your_provider>:<your_model> (for example anthropic:claude-opus-4-7). Models can be overridden at the task level, as long as the model is accessible through the provided Airflow connection.

Decide which decorator to use

The following table lists each decorator and operator in the Common AI provider and describes typical use cases.

Decorator or operatorDescription
@task.agent (AgentOperator)Multi-step agent with toolsets, optional durable caching, or iterative human-in-the-loop review. Use it when a model needs access to external systems to perform a task. For example, an agent could assemble an feature proposal based on a retrieving recent chat messages, the current product roadmap, and a support ticket with a feature request.
@task.llm (LLMOperator)Single-turn model call with optional approval and optional direct edits to the output in the Airflow UI. Use this decorator when you only need a single model call.
@task.llm_sql (LLMSQLQueryOperator)Natural language to SQL with optional access to the table schema, and SQL validation. This decorator works well to create Dags that can run ad-hoc queries on demand.
@task.llm_branch (LLMBranchOperator)Let the model pick one or more downstream tasks from branches. Use it when you want to pick different workflow paths based on unstructured input, for example routing support tickets to different teams.
@task.llm_file_analysis (LLMFileAnalysisOperator)Analyze files from object or local storage. This decorator is useful for document summarization across a set of files.
@task.llm_schema_compare (LLMSchemaCompareOperator)Compare table schemas across databases or mixed sources and summarize differences between them. Use this decorator when you have frequent schema changes that you need to detect to adjust downstream tasks.

@task.agent

The @task.agent decorator and AgentOperator let you run a PydanticAI agent as a task in your Airflow Dag.

1from datetime import timedelta
2from airflow.sdk import task
3from pydantic import BaseModel
4from pydantic_ai import FunctionToolset
5
6
7class MyToolset(FunctionToolset): ... # your custom toolset
8
9class MyOutputClass(BaseModel): ... # your custom output class
10
11
12@task.agent(
13 llm_conn_id="pydanticai_default", # required: your Airflow connection ID
14 model_id="<your_provider>:<your_model>", # default: None (uses the model set in the connection)
15 # Optional parameters:
16 system_prompt="<your_system_prompt>", # default: ""
17 output_type=MyOutputClass, # can be a primitive type or a subclass of pydantic.BaseModel, default: str
18 toolsets=[MyToolset()], # can be set to a list of pydantic_ai.toolset instances and/or Airflow hooks, default: None
19 enable_tool_logging=True, # default: True
20 agent_params={"tool_timeout": 60.0}, # default: None
21 durable=False, # default: False
22 enable_hitl_review=True, # default: False
23 max_hitl_iterations=5, # default: 5
24 hitl_timeout=timedelta(minutes=5), # default: None
25 hitl_poll_interval=10.0, # default: 10.0
26)
27def my_agentic_task(my_input: str) -> str:
28 return f"This is the user prompt! {my_input}"
29
30
31my_agentic_task(my_input="Say hello!")

The string returned by the @task.agent decorated function is the user prompt input to the agent. When using the AgentOperator directly, providing a user prompt with the prompt parameter is required.

The only other mandatory parameter is llm_conn_id, the Airflow connection ID to use for the LLM.

The following optional parameters are available for the @task.agent decorator:

  • model_id: The model to use for the agent. If not given, the operator uses the model set in the Airflow connection. You need to set the model in the format <your_provider>:<your_model>, for example anthropic:claude-opus-4-7.
  • system_prompt: The system prompt to use for the agent, which is loaded into context before the user prompt. It is common to give general instructions to the agent in the system prompt about their role and task, as well as information about available tools. Note that instructions given in the prompt are not guaranteed to be followed.
  • output_type: The format for the output from the agent. You can use primitive types like str, int, float, bool, or a subclass of Pydantic BaseModel for more complex structured outputs. Defaults to str. The output_type format is enforced, in contrast to instructions in the prompt.
  • toolsets: The toolsets the agent has access to as a list of pydantic_ai.toolset instances. Defaults to None.
  • enable_tool_logging: If True, every toolset is wrapped in a LoggingToolset that logs tool calls with timing at INFO level and arguments at DEBUG level. Defaults to True.
  • agent_params: Additional keyword arguments passed to the agent constructor. See the PydanticAI agent documentation for a list of available parameters.
  • durable: Whether to enable step-level caching of model responses and tool results. Note that in order to use durable execution you need to set AIRFLOW__COMMON_AI__DURABLE_CACHE_PATH. See durable execution. Defaults to False. Cannot be used with human-in-the-loop review.
  • enable_hitl_review: Whether to enable human-in-the-loop review through an Airflow plugin. Defaults to False. Needs Airflow 3.1+ and cannot be used with durable execution.
  • max_hitl_iterations: The maximum number of iterations of human-in-the-loop review. Defaults to 5.
  • hitl_timeout: The timeout for the human-in-the-loop review as a timedelta object. Defaults to None.
  • hitl_poll_interval: The interval between polling for human-in-the-loop review. Defaults to 10.0.

Toolsets

AI agents can use tools to perform actions in another system. Tools are typically implemented as functions that can be called by the agent. A toolset is a collection of such functions in a class.

When using the Common AI provider, your agents can use three types of toolsets:

  • Pre-built toolsets included in the Common AI provider: SQLToolset and MCPToolset.
  • Use any Airflow hook as a toolset by wrapping it in the HookToolset class, the hook methods are the tools the agent can call.
  • Custom toolsets you can implement yourself by subclassing a pydantic_ai.toolset class.

Pre-built toolsets

The SQLToolset is a curated toolset that gives your agent access to a SQL database, only allowing the following methods: list_tables, get_schema, query (SELECT only by default), and check_query. It can be used with any SQL database that is supported by the DbApiHook.

1from airflow.providers.common.ai.toolsets.sql import SQLToolset
2
3toolsets=[
4 SQLToolset(
5 db_conn_id="my_sql_conn",
6 allowed_tables=["my_table"], # restrict which tables the agent can access through list_tables and get_schema
7 schema="my_schema", # Database schema/namespace for table listing and introspection.
8 allow_writes=False, # If True, modify operations are allowed (insert, update, delete). Default: False.
9 max_rows=1000 # Maximum number of rows to return from a query. Default: 50.
10 )
11],

The allowed_tables parameter does not parse or validate table references in SQL queries. An LLM can still query tables outside this list if it guesses the name. For query-level restrictions, use database-level permissions (for example a read-only role with grants limited to specific tables).

Setting allow_writes=True allows the agent to perform modify operations (insert, update, delete) on the database. Use at your own risk.

The MCPToolset is a toolset that gives your agent access to an MCP server. It uses the MCPHook to retrieve credentials from an Airflow connection and creates a PydanticAI MCP server instance.

1from airflow.providers.common.ai.toolsets.mcp import MCPToolset
2
3toolsets=[MCPToolset(mcp_conn_id="mcp_default")],

Set the MCP server connection as an Airflow connection. There are two main types of transports for MCP servers:

  • Streamable HTTP: Uses HTTP to stream responses, recommended for remote servers. If your host does not require authentication, you can omit the password field.

    AIRFLOW_CONN_MCP_DEFAULT='{
    "conn_type": "mcp",
    "host": "http://localhost:3001/mcp",
    "password": "<your_auth_token>",
    "extra": {
    "transport": "http"
    }
    }'
  • stdio: Runs the MCP server as a subprocess communicating over stdin/stdout.

    AIRFLOW_CONN_MCP_DEFAULT='{
    "conn_type": "mcp",
    "extra": {
    "transport": "stdio",
    "command": "uvx",
    "args": ["-m", "mcp_server"]
    }
    }'

Using Airflow hooks as toolsets

You can use any Airflow hook as a toolset by wrapping it in the HookToolset class. The hook methods are the tools the agent can call. The allowed_methods parameter is required to make methods available to the agent, auto-discovery is intentionally disabled for safety purposes.

1from airflow.providers.common.ai.toolsets.hook import HookToolset
2
3toolsets=[
4 HookToolset(
5 hook=MyHook(conn_id="my_conn"),
6 allowed_methods=["method1", "method2"],
7 tool_name_prefix="my_prefix_", # optional, default: ""
8 )
9],

You can explore available hooks in the Airflow registry.

Human-in-the-loop review

Human-in-the-loop review lets you iterate on the output of an agentic task by reviewing the agent output and providing feedback. An Airflow plugin adds a HITL Review tab to the task instance page in the Airflow UI. This is different from human-in-the-loop operators, which surface decisions under Required Actions.

1from airflow.sdk import task
2from datetime import timedelta
3
4@task.agent(
5 enable_hitl_review=True,
6 max_hitl_iterations=5,
7 hitl_timeout=timedelta(minutes=5),
8 hitl_poll_interval=10.0,
9)
10def my_agentic_task(my_input: str) -> str:
11 return f"This is the user prompt! {my_input}"
12
13
14my_agentic_task(my_input="Say hello!")

On each iteration you have three options:

  • Approve: The task succeeds with the current assistant output and the HITL review ends.
  • Provide feedback to the agent and click Send: The agent will take the feedback as user prompt and regenerate the output. The maximum number of iterations is controlled by the max_hitl_iterations parameter.
  • Reject: The task fails and the next iteration is not started.

Airflow UI for an agent task instance with the HITL Review tab selected: multi-turn thread with user feedback and assistant replies by iteration, a feedback field, Send, and Approve and Reject actions.

The hitl_timeout parameter provides an optional time limit for the entire HITL review phase, that is, all review rounds combined. When that limit is exceeded, the task fails. The hitl_poll_interval parameter is the number of seconds to wait between polls for the reviewer action on the human-action XCom key.

You can have human-in-the-loop steps in Airflow Dags outside of agentic tasks by using standalone HITL operators.

Durable execution

The durable parameter allows you to enable step-level caching of model responses and tool results. When a task is retried, steps that have already finished do not run again; the task reads their results from the cache instead.

To enable durable mode you need to set the AIRFLOW__COMMON_AI__DURABLE_CACHE_PATH environment variable to the path where the cache will be stored. The destination can be a local temporary directory on the worker or remote object storage like S3 or GCS. After successful task execution the cache is deleted.

AIRFLOW__COMMON_AI__DURABLE_CACHE_PATH=/path/to/cache

To use remote object storage, add your connection ID to the path and provide the credentials in the connection, similar to the configuration of an Object Storage XCom Backend. To use S3 as the durable cache destination, set the following:

AIRFLOW__COMMON_AI__DURABLE_CACHE_PATH=s3://<your_conn_id>@my-bucket/some/prefix/
AIRFLOW_CONN_<your_conn_id>='{
"conn_type": "aws",
"login": "<your-aws-access-key>",
"password": "<your-aws-secret-key>",
"extra": {
"region_name": "<your-region>"
}
}'

To run an agent in durable mode set durable=True.

1from airflow.sdk import task
2
3@task.agent(
4 durable=True,
5)

When a durable agent retries and uses previously cached results you’ll see a log message like this:

[2026-04-26 20:37:33] INFO - Durable: replayed 3 cached steps (2 model, 1 tool), executed 4 new steps (2 model, 2 tool)

You can only use durable=True if enable_hitl_review=False.

Output type

Pass output_type to set the format of the agent output. It can be a primitive type like str, int, float, bool, or a subclass of Pydantic BaseModel for more complex structured outputs. Use the Field class to add descriptions to the fields for the agent to use.

1from typing import Literal
2
3from pydantic import BaseModel, Field
4
5class MyOutputClass(BaseModel):
6 my_field_one: Literal["A", "B", "C", "D", "F"] = Field(
7 description=(
8 "Letter grade for xyz..."
9 )
10 )
11 my_field_two: str = Field(
12 description="String that describes xyz..."
13 )
14 my_field_three: list[str] = Field(
15 description=(
16 "List of strings that describe xyz..."
17 )
18 )
19 my_field_four: int = Field(
20 description=(
21 "Number that describes xyz..."
22 )
23 )

Using the above MyOutputClass as the output_type parameter the agentic task will always produce a JSON output with the fields and their values.

1{
2 "my_field_one": "A",
3 "my_field_two": "In summary...",
4 "my_field_three": ["In detail...", "In detail..."],
5 "my_field_four": 10
6}

@task.llm

The @task.llm decorator and LLMOperator make a single turn LLM call and return the model output.

1from datetime import timedelta
2from airflow.sdk import task
3
4@task.llm(
5 llm_conn_id="pydanticai_default", # required: your Airflow connection ID
6 model_id="<your_provider>:<your_model>", # default: None (uses the model set in the connection)
7 # Optional parameters:
8 system_prompt="", # default: ""
9 output_type=str, # default: str; use a BaseModel subclass for structured output
10 agent_params=None, # default: None; passed to the PydanticAI Agent constructor
11 require_approval=False, # default: False; defer for human approve or reject in the UI
12 approval_timeout=timedelta(minutes=10), # default: None; max wait for approval
13 allow_modifications=False, # default: False; reviewer may edit text before approve
14)
15def my_llm_task(user_context: str) -> str:
16 return user_context
17
18
19my_llm_task()

The string your @task.llm callable returns is the user prompt sent to the model. When you using LLMOperator directly, the prompt argument is required.

Optional parameters:

  • model_id: Overrides the model in the connection extra (format: <provider>:<model>).
  • system_prompt: System instructions loaded before the user prompt.
  • output_type: Return type for the run. Defaults to str. For structured JSON, set a subclass of Pydantic BaseModel, see Output type.
  • agent_params: Extra keyword arguments for the PydanticAI Agent constructor (for example model_settings). See the PydanticAI Agent parameters. Despite performing a single turn LLM call, you can still pass arguments to the agent constructor when using @task.llm.
  • require_approval: When True, the task defers after generation until a human approves or rejects through the approval UI. Defaults to False.
  • approval_timeout: Time limit to wait for a review when require_approval is True. When it is exceeded, the task fails. Defaults to None.
  • allow_modifications: When True with approval enabled, the reviewer can edit the generated text before approval; that edited value becomes the task result. Defaults to False.

When require_approval is True, the task defers after the model returns its output. In the Airflow UI, open the task instance, select the Required Action tab, read the generated text, edit the output if needed, and then click Approve or Reject.

Airflow UI for a deferred llm task instance on the Required Action tab: Markdown model output (events, pilot notes, cargo), an editable output field with optional edits before approval, and Approve and Reject controls.

Human-in-the-loop review behaves differently for @task.llm and @task.agent. With @task.llm, the task defers after generation until a human approves or rejects through the approval UI. There is only one approval step, and the reviewer can edit the model output before approval when allow_modifications is True. With @task.agent and enable_hitl_review=True, the task does not defer; it keeps running after output generation and waits on the HITL Review tab for approval, rejection, or feedback that triggers another iteration. Use max_hitl_iterations to cap how many review rounds run.

You can have human-in-the-loop steps in Airflow Dags outside of agentic tasks by using standalone HITL operators.

@task.llm_sql

The @task.llm_sql decorator and LLMSQLQueryOperator turn natural language into SQL. The operator can pull table metadata through a DbApiHook from db_conn_id, or you can supply a full schema string yourself. It generates SQL only; it does not run queries. Downstream tasks (for example SQLExecuteQueryOperator) can execute the string returned in XCom.

1from datetime import timedelta
2
3from airflow.sdk import task
4
5
6@task.llm_sql(
7 llm_conn_id="pydanticai_default", # required: your Airflow connection ID
8 db_conn_id="postgres_default", # optional: connection that resolves to DbApiHook for accessing table schema
9 table_names=["orders", "customers"], # optional: tables to describe when using db_conn_id
10 schema_context=None, # optional: manual schema text; when set, skips db_conn_id access
11 validate_sql=True, # default True: validate generated SQL with sqlglot
12 dialect=None, # optional: for example "postgres"; inferred from the hook when None
13 model_id="<your_provider>:<your_model>", # optional, default None
14 system_prompt="", # optional: appended to the built-in SQL safety instructions
15 agent_params=None, # optional: passed to the PydanticAI Agent constructor
16 require_approval=False,
17 approval_timeout=timedelta(minutes=10),
18 allow_modifications=False,
19)
20def my_nl_sql_task(question: str) -> str:
21 return question
22
23
24my_nl_sql_task()

The string your callable returns is the natural language prompt that describes the query you want. When you use LLMSQLQueryOperator directly, pass that text with the prompt argument. You must set llm_conn_id to the PydanticAI connection.

Additional parameters:

  • db_conn_id: Airflow connection used to access metadata about your database. The hook must be a DbApiHook. Omit when you fully describe the schema with schema_context.
  • table_names: List of table names to include when accessing the database with db_conn_id.
  • schema_context: Free-form schema description. When you set this, the operator does not access the database for metadata.
  • validate_sql: When True (default), generated SQL is checked with sqlglot before the task finishes.
  • allowed_sql_types: Tuple of allowed statement roots (defaults to read-only shapes such as Select, Union, Intersect, and Except in sqlglot). Be careful when adding writing statements like Insert, Update, or Delete.
  • dialect: sqlglot dialect name (for example postgres, mysql). When None, the operator tries to infer it from the database hook.
  • datasource_config: Optional extra configuration for the datasource side of generation (see the provider source for structure when you need it).

Parameters inherited from LLMOperator (model_id, system_prompt, output_type, agent_params, require_approval, approval_timeout, allow_modifications) behave like they do for @task.llm. When require_approval is True and allow_modifications is True, a reviewer can edit the generated SQL; the provider re-validates edited SQL against the allowed_sql_types rules before returning it.

Airflow UI for a deferred llm_sql task instance on the Required Action tab: natural language prompt about available spacecraft, generated SELECT on the spacecraft table, an editable output field for the SQL, and Approve and Reject controls.

@task.llm_branch

The @task.llm_branch decorator and LLMBranchOperator extend the LLMOperator with branching. At run time the operator reads downstream task IDs from the Dag, exposes them to the model as a constrained enum through PydanticAI structured output, and skips tasks the model does not select. Note that you need to create a dependency between the branch task and its downstream candidates.

1from airflow.sdk import dag, task
2
3
4@dag
5def example_llm_branch():
6 @task.llm_branch(
7 llm_conn_id="pydanticai_default",
8 model_id="<your_provider>:<your_model>", # optional, default None
9 allow_multiple_branches=False, # default False
10 system_prompt="Route support tickets to the right team.",
11 agent_params=None, # optional: passed to the PydanticAI Agent constructor
12 )
13 def route_ticket(message: str) -> str:
14 return f"Route this support ticket: {message}"
15
16 @task
17 def handle_billing():
18 return "Handling billing issue"
19
20 @task
21 def handle_auth():
22 return "Handling auth issue"
23
24 @task
25 def handle_general():
26 return "Handling general issue"
27
28 chain(
29 route_ticket("I was charged twice for my subscription."),
30 [
31 handle_billing(),
32 handle_auth(),
33 handle_general(),
34 ]
35 )
36
37
38example_llm_branch()

The string your callable returns is the user prompt. When you use LLMBranchOperator directly, pass that text with the prompt argument. Set llm_conn_id to the PydanticAI connection. Additionally, you can set allow_multiple_branches to True to allow the model to return multiple downstream task IDs.

Parameters inherited from LLMOperator (model_id, system_prompt, agent_params, and the same optional human-in-the-loop approval fields as @task.llm) behave the same way as for a plain LLM task.

@task.llm_file_analysis

The @task.llm_file_analysis decorator and LLMFileAnalysisOperator analyze one file, a prefix, or a small set of files through a single LLM call. The operator resolves file_path with ObjectStoragePath, normalizes supported formats into text context, and optionally attaches PNG, JPG, or PDF inputs as multimodal payloads when multi_modal=True. For object storage URIs you can embed the connection ID in the path (for example s3://<conn_id>@my-bucket/prefix/) or set file_conn_id separately.

1from datetime import timedelta
2from airflow.sdk import , task
3
4@task.llm_file_analysis(
5 llm_conn_id="pydanticai_default",
6 file_path="s3://aws_default@my-bucket/reports/quarterly.pdf",
7 file_conn_id=None, # optional: overrides connection embedded in file_path
8 multi_modal=True, # default False; set True for vision-capable models on images/PDF
9 max_files=20, # default 20; caps files resolved from a prefix
10 max_file_size_bytes=5 * 1024 * 1024, # default 5 MiB per file
11 max_total_size_bytes=20 * 1024 * 1024, # default 20 MiB total
12 max_text_chars=100_000, # default 100000; how much text to read from the files
13 sample_rows=10, # default 10; preview rows for CSV, Parquet, Avro
14 model_id="<your_provider>:<your_model>",
15 system_prompt="",
16 output_type=str, # default str
17 agent_params=None,
18 require_approval=False,
19 approval_timeout=timedelta(minutes=10),
20 allow_modifications=False,
21)
22def review_quarterly_report() -> str:
23 return "Extract the key revenue, risk, and compliance findings from this report."
24
25review_quarterly_report()

The string your callable returns is the analysis prompt. When you use LLMFileAnalysisOperator directly, pass that text with the prompt argument.

Additional parameters:

  • file_path: File or prefix to analyze (local paths or object storage paths supported by Airflow object storage).
  • file_conn_id: Optional Airflow connection for the storage backend when it is not embedded in file_path.
  • multi_modal: When True, PNG, JPG, JPEG, and PDF inputs can be sent as binary attachments; requires a multimodal-capable model.
  • max_files, max_file_size_bytes, max_total_size_bytes, max_text_chars, sample_rows: Guardrails for listing, reading, and how much normalized text or row samples reach the model. Extra files under a large prefix are omitted and the operator notes that in the prompt context.

Optional dependencies: Parquet and Avro handling need the provider extras described in the Airflow documentation.

@task.llm_schema_compare

The @task.llm_schema_compare decorator and LLMSchemaCompareOperator read schema metadata from two or more systems and ask an LLM to flag differences. The task result is a dict SchemaCompareResult with fields such as compatible, mismatches, and summary. Each entry in mismatches is a SchemaMismatch with severity, column, types, and suggested actions.

You can supply databases in two ways:

  • db_conn_ids together with table_names: shorthand to compare the same logical table across connections. Each connection must resolve to a DbApiHook.
  • data_sources: a list of DataSourceConfig objects for more complex setups (for example object storage or catalog-backed sources combined with db_conn_ids).
1from datetime import timedelta
2from airflow.sdk import task
3
4@task.llm_schema_compare(
5 llm_conn_id="pydanticai_default",
6 db_conn_ids=["postgres_source", "snowflake_target"],
7 table_names=["customers"],
8 context_strategy="full", # "full" (default) or "basic"; full adds keys and indexes
9 model_id="<your_provider>:<your_model>",
10 agent_params=None,
11 require_approval=False,
12 approval_timeout=timedelta(minutes=10),
13 allow_modifications=False,
14)
15def check_migration_readiness() -> str:
16 return (
17 "Compare schemas and flag breaking changes for nightly ETL. "
18 "Suggest migration actions where they help."
19 )
20
21check_migration_readiness()

The string your callable returns is the comparison prompt. When you use LLMSchemaCompareOperator directly, pass that text with the prompt argument.

Additional parameters:

  • db_conn_ids and table_names: Use together for the same table name across multiple database connections.
  • data_sources: Optional list of DataSourceConfig for mixed database and object storage comparisons.
  • context_strategy: "basic" sends column names and types only; "full" (default) adds primary keys, foreign keys, and indexes to the context sent to the model.
  • system_prompt: The operator ships a default prompt that encodes cross-system type rules and severity levels. If you set system_prompt to any string, it replaces that default; import DEFAULT_SYSTEM_PROMPT and concatenate when you want to extend rather than replace.

Parameters inherited from LLMOperator (model_id, agent_params, and optional approval fields) match @task.llm. Downstream tasks can branch on comparison_result["compatible"] or inspect mismatches as shown in the Airflow documentation.