# Use DAG Factory to create dags > Learn how to dynamically convert YAML files into Apache Airflow® dags with DAG Factory, an open source project that makes creating dags easy. [DAG Factory](https://astronomer.github.io/dag-factory/latest/) is an open source tool managed by Astronomer that allows you to [dynamically generate](dynamically-generating-dags) [Apache Airflow®](https://airflow.apache.org/) dags from [YAML](https://yaml.org/). While Airflow dags are traditionally written exclusively in Python, DAG Factory makes it easy for people who don't know Python to use Airflow. This guide provides a complete walkthrough of using DAG Factory package to build production-ready pipelines in a modern Airflow project. You will learn to install the library, structure your project according to best practices, and define a multi-task pipeline entirely in YAML. The example demonstrates powerful features like using the TaskFlow API, organizing tasks with task groups, and passing data between tasks, all from your configuration file. By the end, you'll be ready to apply these patterns to your own dynamic dags. DAG Factory can be used with all Astronomer products and any Apache Airflow installation. To view the source code of the project, have a look at the [dag-factory](https://github.com/astronomer/dag-factory) GitHub repository. ## When to Use DAG Factory While writing dags directly in Python is powerful and flexible, it's not always the most efficient approach for every use case. DAG Factory offers a configuration-driven alternative where you define the structure of your pipelines in YAML. This is particularly useful in several key scenarios: YAML is often more approachable than Python. DAG Factory allows team members like analysts or junior engineers, who may not be Airflow experts, to create and manage their own dags with a simplified, declarative syntax. If you have dozens of dags that follow the same pattern (like a standard extract-and-load job), DAG Factory is ideal. You can create a standard template and then generate numerous dags just by changing the parameters in a YAML file, which reduces code duplication and simplifies maintenance. DAG Factory helps you separate the *what* from the *how*. The YAML clearly defines the dag's structure and dependencies, while the underlying Python functions handle the actual business logic. This makes your dags easier to read at a glance and your Python code more modular and testable. While DAG Factory offers significant advantages for some use cases, **there are scenarios when using other ways of dag authoring like writing dags directly in Python are more appropriate**. When your data pipelines require complex conditional logic, branching or sophisticated error handling that goes beyond what YAML can express cleanly, native Python is generally the better approach. Additionally, YAML-based dags can be more challenging to debug compared to native Python code, as you are missing extensive logging or step-through debugging capabilities. Finally, your way of orchestrating workflows should match your team environment, so consider existing expertise. While DAG Factory is a flexible product that supports all the main concepts of Airflow, newer features like [asset-aware scheduling](airflow-datasets) may work but are not as user-friendly or as well integrated as others. ## Assumed knowledge To get the most out of this tutorial, you should have an understanding of: - The [Airflow components](airflow-components) and how they work together. - [Airflow fundamentals](get-started-with-airflow), such as writing dags and defining tasks. - Basic understanding of [Airflow operators](what-is-an-operator). ## Prerequisites - Python 3.9.0+ - The [Astro CLI](https://www.astronomer.io/docs/astro/cli/install-cli) ## Step 1: Initialize your Airflow Project with the Astro CLI First, create a new project directory and initialize an Astro project using the [Astro CLI](https://www.astronomer.io/docs/astro/cli/install-cli). ```bash mkdir my-dag-factory-project && cd my-dag-factory-project astro dev init ``` The `init` command creates a standard Airflow project structure. Since this tutorial focuses on DAG Factory, let's remove the example dag that's included by default. ```bash rm dags/exampledag.py ``` Next, add the `dag-factory` library as a project dependency. Open `requirements.txt` and add the following line: ``` dag-factory==1.0.1 ``` Now, start your local Airflow environment. The Astro CLI will build your project, installing `dag-factory` in the process. ```bash astro dev start ``` Once the project is running, the Airflow UI should open automatically at `http://localhost:8080` and you will be presented with an empty dags list. ## Step 2: Organizing the project A key to building a maintainable and performant Airflow project is proper organization. While you could put all your YAML configs, Python scripts, and SQL files into the `dags/` folder, this can quickly become messy and put unnecessary strain on the dag processor. Astronomer recommends placing Python, SQL, and other scripts that are not dag definitions in the `include/` folder. Files in this folder are available to your dags but are not parsed by the Airflow dag processor, which reduces overhead and improves performance. For this tutorial, we'll use a structure that is also a great starting point for real-world projects: - `dags/`: This folder will contain only the YAML configuration files and the Python script that generates the dags from them. This keeps all dag definitions encapsulated. - `include/`: We will create a `tasks` subfolder here to hold the Python functions that our operators will call. Any other supporting scripts (e.g. SQL queries) would also live in sub-folders within `include/`. We will apply this principle in the next steps. For larger projects with a mix of dynamically generated and standard Python dags, consider organizing further. For example, you could create a `dags/configs` subfolder to hold all your DAG Factory YAML files, keeping them separate from your other `.py` dag files. To separate our business logic from the rest of our orchestration logic, create a new folder named `tasks` inside `include`. There, we'll add Python scripts that define the functions our YAML-based pipelines will call in the next steps. ```bash mkdir -p include/tasks ``` ## Step 3: Prepare functions Our example dag will orchestrate a simple pipeline using both a `PythonOperator` using the [TaskFlow API](airflow-decorators) and a `BashOperator` using the [traditional operator](what-is-an-operator). DAG Factory supports both traditional operators and the modern TaskFlow API. This tutorial targets Airflow 3.x and will use the TaskFlow API decorator syntax whenever possible. Before defining the dag in YAML, let's write the Python functions that our tasks will execute. Following our plan from Step 2, we'll place these functions in the `include/tasks/` folder. Create a file named `include/tasks/basic_example_tasks.py` with the following content: ```python def _extract_data() -> list[int]: return [1, 2, 3, 4] def _store_data(processed_at: str, data_a: list[int], data_b: list[int]) -> None: print(f"Storing {len(data_a + data_b)} records at {processed_at}") ``` Design your Python functions to be small, self-contained, and independently testable, which aligns with best practices for both DAG Factory and general Airflow development. ## Step 4: Define a basic dag in YAML Now we can create the YAML definition for our dag. Create a new YAML file in the `dags` folder named `basic_example.yml` and add the following content: ```yaml basic_example_dag: default_args: owner: "astronomer" start_date: 2025-09-01 description: "Basic example DAG" tags: ["demo", "etl"] schedule: "@hourly" task_groups: extract: tooltip: "data extraction" tasks: extract_data_from_a: decorator: airflow.sdk.task python_callable: include.tasks.basic_example_tasks._extract_data task_group_name: extract extract_data_from_b: decorator: airflow.sdk.task python_callable: include.tasks.basic_example_tasks._extract_data task_group_name: extract store_data: decorator: airflow.sdk.task python_callable: include.tasks.basic_example_tasks._store_data processed_at: "{{ logical_date }}" data_a: +extract_data_from_a data_b: +extract_data_from_b dependencies: [extract] validate_data: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: "echo data is valid" dependencies: [store_data] ``` This YAML file defines the dag's structure and its tasks. Note how `+extract_data_from_a` and `+extract_data_from_b` are used to pass the return value of the `extract` tasks to the `store_data` task, and how Jinja templating (`{{ logical_date }}`) is used to pass the logical date. ## Step 5: Implement the generator script The final step to make our dag appear, is to create the Python script that Airflow will parse. This script uses the DAG Factory library to find our YAML file and generate the actual Airflow dag object from it. This approach gives you full control over the generation process and allows for extensive customization in advanced use cases. Create a Python file named `dags/basic_example_dag_generation.py` with the following content: ```python import os from pathlib import Path from dagfactory import load_yaml_dags DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) config_file = str(CONFIG_ROOT_DIR / "basic_example.yml") load_yaml_dags( globals_dict=globals(), config_filepath=config_file, ) ``` Once the dag processor parses this file, your dag with the ID `basic_example_dag` will appear in the UI. It has 4 tasks in its pipeline, 2 of them within a task group: - `extract_data`: Uses the TaskFlow API to call the `_extract_data` function from our `include/tasks/basic_example_tasks.py` script. We create 2 different tasks in this scenario. Both return a list of numbers. - `store_data`: Uses the TaskFlow API to call the `_store_data` function from our `include/tasks/basic_example_tasks.py` script. As you can see, to pass parameters with this approach, just set them with the appropriate name directly in the YAML configuration. With `+extract_data` we tell DAG Factory to reference the return value of the `extract_data` task. Also as shown in the example, you can use Jinja templating including [variables, macros and filters](https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html). - `validate_data`: Here, we use the classic approach to use the `BashOperator`, just printing the sentence _data is valid_. The `load_yaml_dags` function is responsible for generating the dags. You can point it to a specific file, or to a folder that it will scan recursively for `.yml` or `.yaml` files. It uses the provided `globals_dict` to add the generated dags to the Airflow context. For more options, see the [official documentation](https://astronomer.github.io/dag-factory/latest/configuration/load_yaml_dags/). ![Basic generated dag example](file:71de8cab-d992-456b-9e92-801f315cfcb1) With this, you already know the basics of how to orchestrate a dag with YAML, including using task groups, passing data between tasks, using the TaskFlow API and classic operators, and setting basic dag attributes like the schedule or tags. Dags defined with DAG Factory automatically receive the `dagfactory` tag. Also, if you select **Dag Docs** from the individual dag view, it will by default show the YAML file that created the dag, which is very useful for debugging. ![Dag docs showing YAML definition](file:c00bc15c-daa5-40db-8c46-b7fbc1c5031e) This is a great starting point, and the following steps will cover more advanced features to prepare your DAG Factory knowledge for real-world use cases. ## (Optional) Step 6: Asset-Aware Scheduling with YAML Now, let's explore one of Airflow's most powerful features, [asset-aware scheduling](airflow-datasets), and how to implement it using DAG Factory. We will create two dags: a producer that updates an asset, and a consumer that runs whenever that asset is updated. First, let's create the Python functions that our tasks will execute. These functions will fetch data from an API, save it to a file, and then read it back. Create a new file named `include/tasks/asset_example_tasks.py` with the following content: ```python import json import tempfile import requests def _get_iss_coordinates_file_path() -> str: return tempfile.gettempdir() + "/iss_coordinates.txt" def _update_iss_coordinates() -> None: placeholder = {"latitude": "0.0", "longitude": "0.0"} try: response = requests.get("http://api.open-notify.org/iss-now.json", timeout=5) response.raise_for_status() data = response.json() coordinates = data.get("iss_position", placeholder) except Exception: coordinates = placeholder with open(_get_iss_coordinates_file_path(), "w") as f: f.write(json.dumps(coordinates)) def _read_iss_coordinates() -> None: path = _get_iss_coordinates_file_path() with open(path, "r") as f: print("::group::ISS Coordinates") print(f.read()) print("::endgroup::") ``` The `_update_iss_coordinates` function retrieves data from an API and writes it to a file, while `_read_iss_coordinates` reads this file and prints the content to a dedicated log group. Now that we have our Python logic, we can define the two dags that will orchestrate it. Create a new YAML file at `dags/asset_example.yml`: ```yaml default: start_date: 2025-09-01 update_iss_coordinates: schedule: "@daily" tasks: update_coordinates: decorator: airflow.sdk.task python_callable: include.tasks.asset_example_tasks._update_iss_coordinates outlets: - __type__: airflow.sdk.Asset name: "iss_coordinates" process_iss_coordinates: schedule: - __type__: airflow.sdk.Asset name: "iss_coordinates" tasks: read_coordinates: decorator: airflow.sdk.task python_callable: include.tasks.asset_example_tasks._read_iss_coordinates ``` This single YAML file defines both the `update_iss_coordinates` (_producer_) and `process_iss_coordinates` (_consumer_) dags. For the producing dag we define an outlet of type `airflow.sdk.Asset` and name it `iss_coordinates`. The consuming dag then uses this same asset identifier for its `schedule` attribute, which creates the dependency. Also, take note of the YAML top-level `default` block. **This configuration affects all the dags defined in the YAML file**, allowing you to share standard settings and configurations, for improved consistency, maintainability and simplicity. Finally, to generate these dags in Airflow, we need to create a corresponding generator script. Create a new file named `dags/asset_example_dag_generation.py` with the following content: ```python import os from pathlib import Path from dagfactory import load_yaml_dags DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) config_file = str(CONFIG_ROOT_DIR / "asset_example.yml") load_yaml_dags( globals_dict=globals(), config_filepath=config_file, ) ``` And that's it! Once the scheduler parses this file, you will see two new dags in the Airflow UI, connected by the `iss_coordinates` asset. When you run the `update_iss_coordinates` dag, the `process_iss_coordinates` dag will be triggered automatically upon its completion. ![Asset consumer task logs](file:085d9af6-afaf-4f05-b760-22e881b438cc) For a simpler approach to creating one dag with one task updating an asset, you could use [@asset syntax](airflow-datasets#asset-syntax), adding `@asset(schedule="@daily")` directly to the `_update_iss_coordinates` function in your Python file. This would allow you to remove the `update_iss_coordinates` dag definition from your YAML entirely. This tutorial defines both in YAML to fully demonstrate how DAG Factory handles asset producers and consumers. ## (Optional) Step 7: Alternative YAML Loading In the previous steps, we used a dedicated Python dag generation script for each dag to parse the YAML with DAG Factory. This is a useful approach for maximum control over the generation process, and to avoid any unexpected workload when teams work with many YAML files. However, it also adds complexity. The `load_yaml_dags` function therefore also supports a more pragmatic way, to parse all YAML files in your dags folder recursively. To illustrate this, delete the two generator scripts `dags/basic_example_dag_generation.py` and `dags/asset_example_dag_generation.py`. Then create a new file `dags/dag_generation.py`: ```python # keep import to ensure the dag processor parses the file from airflow.sdk import dag from dagfactory import load_yaml_dags load_yaml_dags(globals_dict=globals()) ``` In this particular case, we need to add the dag import as an indicator for Airflow, to not skip this file during parsing. You will notice, the result will be the same as before, and all additional YAML files added will now be automatically processed. When searching for dags inside the dag bundle, Airflow only considers Python files that contain the strings `airflow` and `dag` (case-insensitively) as an optimization. Because of these optimizations, you might need to add the `dag` import to ensure your file is parsed. To consider all Python files instead, disable the `DAG_DISCOVERY_SAFE_MODE` configuration flag. In case you want to outsource your YAML definitions, you can overwrite the `dags_folder` argument when calling the `load_yaml_dags` function to set a custom folder to process recursively. ## (Optional) Step 8: Configuration and inheritance As you create more dags, you'll want to avoid repeating the same configuration. DAG Factory includes powerful features for centralized configuration and inheritance to help you keep your dag definitions clean, consistent, and easy to maintain across your project. This feature allows you to set default values for both dag-level arguments (like `schedule`) and task-level arguments (like `retries` via `default_args`). In our `dags/asset_example.yml` file, you already discovered one way to configure dags in a centralized way within the YAML definition: ```yaml default: start_date: 2025-09-01 update_iss_coordinates: # ... process_iss_coordinates: # ... ``` With this approach both dags, `update_iss_coordinates` and `process_iss_coordinates`, will use the `start_date` from the `default` block. This feature becomes even more powerful, when using global defaults in combination with inheritance. To illustrate this, let's imagine a real-world scenario with a set of company-wide data pipeline standards: * All dags should have a default `start_date` of `2025-09-01`. * All dags should be owned by `astronomer`, unless they belong to a specific department. * All tasks should have 2 retries by default. * The default schedule for all dags should be daily at midnight (`@daily`), unless specified otherwise. DAG Factory automatically looks for a file named `defaults.yml` in your dags folder and applies its configuration to all dags within that folder and its subfolders. This creates a single source of truth for your global defaults. `load_yaml_dags` uses the same default path for both the configurations and the YAML files: the path set as `dags_folder`. You can override only the path where DAG Factory looks for configurations, by setting the `defaults_config_path` parameter. To implement our company standards, create a new file at `dags/defaults.yml` with the following content: ```yml schedule: "@daily" # dag-specific arguments at root level default_args: owner: "astronomer" retries: 2 ``` The real power of this feature comes from inheritance. DAG Factory applies `defaults.yml` files hierarchically. A `defaults.yml` in a subfolder will inherit from its parent and can override any of the parent's settings. Let's apply this to our scenario. We want to override the default `owner` for our Marketing and Finance departments, and also change the default `schedule` just for the Marketing department, to run dags at 1 AM rather than midnight for this department. First, let's create the folder structure: ```text airflow └── dags ├── defaults.yml ├── marketing │ ├── defaults.yml │ └── marketing_dag.yml └── finance ├── defaults.yml └── finance_dag.yml ``` Now, create `dags/marketing/defaults.yml` to set a new `schedule` and `owner`: ```yml schedule: "0 1 * * *" default_args: owner: "astronomer-marketing" ``` And for the Finance department, create `dags/finance/defaults.yml` to override only the `owner`: ```yml default_args: owner: "astronomer-finance" ``` Now that our defaults are in place, creating the actual dags is incredibly simple and clean. Create `dags/marketing/marketing_dag.yml`: ```yml marketing_dag: tasks: some_process: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: "echo processing data" ``` And similarly, create `dags/finance/finance_dag.yml`: ```yml finance_dag: tasks: some_process: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: "echo processing data" ``` Notice how concise these definitions are. We don't need to specify `start_date`, `owner`, or `retries` because they are all handled by our layered `defaults.yml` files. This allows you to write minimal dag configurations while maintaining centralized control over your project's standards. In the Airflow UI, you will see two new dags, each with a different set of inherited properties: * `marketing_dag`: Inherits the `schedule` (`0 1 * * *`) and `owner` (`astronomer-marketing`) from its local `defaults.yml`, and `retries` from the global `defaults.yml`. * `finance_dag`: Inherits the `owner` (`astronomer-finance`) from its local `defaults.yml`, and both the `schedule` (`@daily`) and `retries` from the global `defaults.yml`. ![Dags with inherited properties](file:85009e6f-ac8f-4351-8818-2bd4075b7119) If any `defaults.yml` files are inside your `dag_folder`, DAG Factory might try to parse them as dags, which can cause errors in your task logs. To prevent this, keep `dags_folder` and `defaults_config_path` separate. Configuration inheritance still works as expected, and these errors are non-critical. ## Advanced usage: Dynamic task mapping DAG Factory also supports [dynamic task mapping](dynamic-tasks), to dynamically generate parallel tasks at runtime. The following example shows how to apply this principle using the TaskFlow API. Let's assume we have the following Python functions defined in `include/tasks/dtm_tasks.py`: ```python def _generate_data(): return [1, 2, 3, 4, 5] def _process_data(processing_date, value): print(f"Processing {value} at {processing_date}") ``` We can now simply reference arguments under `partial` and `expand` in our YAML, to let DAG Factory apply dynamic task mapping: ```yaml dtm_example: default_args: owner: "astronomer" start_date: 2025-09-01 schedule: "@hourly" tasks: generate_data: decorator: airflow.sdk.task python_callable: include.tasks.dtm_tasks._generate_data process_data: decorator: airflow.sdk.task python_callable: include.tasks.dtm_tasks._process_data partial: processing_date: "{{ logical_date }}" expand: value: +generate_data dependencies: [generate_data] ``` With this, we will use the output of `generate_data` to generate parallel task instances. ![YAML-generated dynamic tasks.](file:b82ca5eb-e4df-4b1d-98c9-9c2aa196b691) ## Advanced usage: Dynamic YAML generation The examples above show how to use DAG Factory to create dags based on static YAML files. For use cases where you'd like to create several dags with a similar structure it is possible to create them [dynamically](dynamically-generating-dags) based on a template YAML file to avoid code duplication. Creating a dag dynamically with DAG Factory simply means that you use Python code to create the YAML configurations instead of writing them manually. There are two files that you need: - A **template YAML file** that contains the structure of the dags you want to create with placeholders for the values that will change. - A **Python script** that creates DAG Factory YAML file by replacing the placeholders in the template YAML file with the actual values. Since Airflow uses Jinja2 internally already, we can leverage this library for a more robust generation process. The template YAML file provides the structure for all the dags you will generate dynamically with placeholders for values that vary in between the dags. Create a file called `include/template.yml`: ```text {{ dag_id }}: schedule: "{{ schedule }}" tasks: task_1: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: "{{ bash_command_task_1 }}" task_2: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: "{{ bash_command_task_2 }}" dependencies: [task_1] ``` The Python script reads the template YAML file, replaces the placeholders with the actual values, and writes the resulting YAML files to the `dags` directory. Place this script in the top-level of your project for now. You can run this script manually to generate your dags for local development or automatically as part of your CI/CD pipeline. ```python from pathlib import Path import yaml from jinja2 import Environment, FileSystemLoader TEMPLATE_DIR = "include" TEMPLATE_NAME = "template.yml" OUTPUT_FILE = "dags/dynamic_dags.yml" TEMPLATE_VARIABLES = [{ "dag_id": "example_1", "schedule": "@daily", "bash_command_task_1": "echo task 1 from example 1", "bash_command_task_2": "echo task 2 from example 1", }, { "dag_id": "example_2", "schedule": "@weekly", "bash_command_task_1": "echo task 1 from example 2", "bash_command_task_2": "echo task 2 from example 2", }] def generate_dags_from_template(): # setup Jinja2 env = Environment(loader=FileSystemLoader(TEMPLATE_DIR), autoescape=True) template = env.get_template(TEMPLATE_NAME) # render dags from template all_dags = {} for variables in TEMPLATE_VARIABLES: rendered_yaml_str = template.render(variables) dag_config = yaml.safe_load(rendered_yaml_str) all_dags.update(dag_config) # write to file output_path = Path(OUTPUT_FILE) with open(output_path, "w") as f: yaml.dump(all_dags, f, sort_keys=False) print(f"Successfully generated {len(TEMPLATE_VARIABLES)} dags into {OUTPUT_FILE}") if __name__ == "__main__": generate_dags_from_template() ``` As a result, you will see the dynamically generated `dags/dynamic_dags.yml` file: ```yaml example_1: schedule: '@daily' tasks: task_1: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: echo task 1 from example 1 task_2: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: echo task 2 from example 1 dependencies: - task_1 example_2: schedule: '@weekly' tasks: task_1: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: echo task 1 from example 2 task_2: operator: airflow.providers.standard.operators.bash.BashOperator bash_command: echo task 2 from example 2 dependencies: - task_1 ``` ## Conclusion In this tutorial, you've journeyed from defining a single dag in a YAML file to building a complete framework for dynamically generating your pipelines. You've learned how to: * Define dags, tasks, and task groups using a simple, declarative syntax. * Pass data between tasks and use the TaskFlow API. * Implement Airflow features like asset-aware scheduling and dynamic task mapping. * Manage configuration at scale using hierarchical `defaults.yml` files for inheritance. * Dynamically generate your YAML configurations using a templating engine. Whether your goal is to empower analysts, standardize repetitive ETL jobs, or simply separate your pipeline's structure from its logic, DAG Factory provides a robust, configuration-driven approach to Airflow development. To continue your journey, explore the official [DAG Factory repository](https://github.com/astronomer/dag-factory/tree/main/dev/dags), which contains many more examples and advanced use cases. You now have all the tools to start building your own dynamic dags.