Native support for Source Node Rendering in Cosmos
7 min read |
Over the past year, at GlossGenius, we’ve been using Cosmos to orchestrate our dbt-core workflows with Apache Airflow. Cosmos has been a game-changer, providing seamless integration between both tools, and simplifying how we manage and execute our data workflows. Features like rerunning specific tasks, refreshing downstream models, and leveraging Airflow’s advanced capabilities have made our life easier.
But as we became more familiar with Cosmos, we noticed an opportunity to improve how it handled source freshness checks. The freshness check will verify the health and adherence to SLAs of your source data.
Background: Challenges with source nodes before Cosmos 1.6
Stale, non-blocking sources could cause unnecessary failures
Initially, we followed the standard approach of running dbt source freshness daily after our dbt build DAG. However, this approach wasn’t ideal:
- Running
dbt source freshnesschecks all sources, regardless of their importance. - Checking for freshness before you build your models can create situations where:
- Source A might be stale, but it doesn’t power any critical models.
- Source B is fresh and supports the company’s core models.
Yet, a failure in Source A would block downstream tasks unnecessarily, delaying critical pipelines.
- Checking for freshness after you build your models can waste compute if some models’ sources turned out to be stale. Moreover, following the principle that no data is better than wrong data, we prefer to avoid running models powered by stale sources altogether.
Fig 1. Running freshness check after cosmos tasks.
Custom rendering exposed compute inefficiencies
When Cosmos introduced custom rendering for nodes like sources and exposures, we saw an opportunity to check freshness at the source level. However, there was a limitation:
- All sources were being rendered into tasks, even those without freshness checks.
- These tasks were running dbt commands that effectively did nothing, wasting compute and cluttering our DAGs.
- An open issue(#630) at the time, suggested a feature to render sources without any checks to be rendered as Empty Operators, which, per Airflow docs “are evaluated by the scheduler but never processed by the executor”.
The need for native support
While Cosmos already had patterns in place for models, tests, snapshots and more, it lacked native support for sources. To address this, I contributed a feature with the help of Pankaj Singh, extending the existing pattern to render sources natively.
How does native source rendering work?
Cosmos has multiple ways of rendering dbt DAGs into Airflow DAGs, including:
- Reading directly from a manifest.json file stored locally or in cloud storage.
- Running dbt ls while Airflow compiles the DAG to extract node names, dependencies, types, and other metadata.
To enable custom behavior for source nodes based on freshness checks, it was essential for Cosmos to extract freshness metadata consistently across all supported parsing methods.
- Manifest files already include freshness values.
- Dbt ls does not return freshness by default.
This required updating the dbt ls parsing method to include freshness data. Luckily, dbt ls supports a rich set of arguments, allowing customization of the returned values. This update was straightforward but restricted the feature to dbt versions 1.5 and above.
Key features
Key features of native source rendering include:
Freshness checks only when needed
- A new variable,
has_freshness, has been added to theDbtNodeclass. - What It Does:
True: Indicates the source requires a freshness check.False: Indicates the source doesn’t require freshness checks.
Support for multiple modes
all: Cosmos renders all sources in the dbt project. It uses three different node types for this:- EmptyOperator: For sources that do not have tests or freshness checks.
- DbtSourceOperator: For sources that have freshness checks.
- DbtTestOperator: For sources that have tests.
None(default): No sources are rendered automatically. Custom converters can still be used.with_tests_or_freshness:- Renders only sources that have either tests or freshness checks.
- Ideal for large dbt projects with many sources, avoiding the rendering of thousands of tasks when using the all option.
New rendered template field: freshness
- Includes the
sources.jsongenerated by dbt when runningdbt source freshness. - Provides detailed information about the freshness checks for debugging or analysis.
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",
"dbt_version": "1.8.7",
"generated_at": "2025-03-26T18:23:52.753220Z",
"invocation_id": "c04024de-2b85-4bb8-b236-5d4ba7206382",
"env": {}
},
"results": [
{
"unique_id": "source.altered_jaffle_shop.postgres_db.raw_orders",
"max_loaded_at": "2018-04-09T00:00:00+00:00",
"snapshotted_at": "2025-03-26T18:23:52.685384+00:00",
"max_loaded_at_time_ago_in_s": 219781432.685384,
"status": "pass",
"criteria": {
"warn_after": {
"count": 3650,
"period": "day"
},
"error_after": {
"count": null,
"period": null
},
"filter": null
},
"adapter_response": {
"_message": "SELECT 1",
"code": "SELECT",
"rows_affected": 1
},
"timing": [
{
"name": "compile",
"started_at": "2025-03-26T18:23:52.616401Z",
"completed_at": "2025-03-26T18:23:52.616403Z"
},
{
"name": "execute",
"started_at": "2025-03-26T18:23:52.616556Z",
"completed_at": "2025-03-26T18:23:52.688815Z"
}
],
"thread_id": "Thread-1",
"execution_time": 0.07286596298217773
}
],
"elapsed_time": 0.6606731414794922
}
Support for source tests
- The feature fully integrates with dbt source tests.
Example: Comparing modes
Here’s an example showcasing the three native source rendering modes:
Source configuration
version: 2
sources:
- name: postgres_db
database: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
tables:
- name: raw_customers
- name: raw_payments
columns:
- name: id
tests:
- unique
- not_null
- name: raw_orders
columns:
- name: id
tests:
- unique
- not_null
freshness:
warn_after:
count: 3650
period: day
loaded_at_field: CAST(order_date AS TIMESTAMP)
As you can see, we have 3 different source tables from the same database:
raw_customersdoesn’t have tests nor freshness checks.raw_paymentshas testsraw_ordershas both tests and freshness checks
Cosmos DAG setup
from datetime import datetime
from cosmos import DbtDag, ProjectConfig, RenderConfig
# New in 1.6: import SourceRenderingBehavior options
from cosmos.constants import SourceRenderingBehavior
# define the dbt profile
airflow_db = ProfileConfig(
profile_name="airflow_db",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_metadata_db",
profile_args={"schema": "dbt"},
),
)
jaffle_shop_path = Path("/usr/local/airflow/dbt/jaffle_shop")
dbt_executable = Path("/usr/local/airflow/dbt_venv/bin/dbt")
# define the execution configuration
venv_execution_config = ExecutionConfig(
dbt_executable_path=str(dbt_executable),
)
# create a DAG from a dbt-core project
simple_dag = DbtDag(
project_config=ProjectConfig(jaffle_shop_path),
profile_config=airflow_db,
execution_config=venv_execution_config,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
dag_id="simple_dag",
# New in 1.6: set the render config to include source nodes
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL, #swap between ALL, NONE or WITH_TESTS_OR_FRESHNESS
),
)
Results
Mode: all
raw_customers: Rendered as EmptyOperatorraw_payments: Rendered as EmptyOperator and its tests rendered as a DbtTestOperatorraw_orders: Rendered as a DbtSourceOperator and its tests rendered as a DbtTestOperator
Fig 2. DAG graph view for source rendering behaviour all.
Mode: none
- Not a single source is being rendered
Fig 3. DAG graph view for source rendering behaviour none.
Mode: with_tests_or_freshness
raw_customers: Not renderedraw_payments: Rendered as DbtSourceOperator and its tests rendered as a DbtTestOperatorraw_orders: Rendered as a DbtSourceOperator and its tests rendered as a DbtTestOperator
Fig 4. DAG graph view for source rendering behaviour with_tests_or_freshness.
Getting Started
To use this feature:
- Upgrade to Cosmos version 1.6 or higher. Refer to the Cosmos getting started documentation
- Enable the
ALLorWITH_TEST_OR_FRESHNESSoption for native source node rendering. Check the Source rendering documentation for more details - Test the feature with your dbt projects
Conclusion
This new feature in Cosmos is a step toward making dbt and Airflow workflows more efficient and tailored to real-world use cases. By rendering source nodes natively we can reduce wasted compute, avoid running models with stale data, and build workflows that align better with business priorities.
I’m excited to see how this feature will be used by the community and what further improvements it might inspire. If you have any questions, feedback, or ideas, join the conversation in the Airflow Slack in the airflow-dbt channel.