Join us for Astro Days: NYC on Sept 27!
Webinar Recap

Introducing Astro: Data Centric DAG Authoring

By Daniel Imberman

This webinar presents the bright future of data-centric DAG writing with a promising solution offering new features that will make your life easier. Let’s find out what we created!

Why Astro?

After the release of Airflow 2.0 in 2020, we made some foundational fixes, including the HA Scheduler, the Taskflow API separating all of the operators and the independent providers, and other infrastructure-level improvements. The feedback was that the tool worked perfectly for the Airflow admins. Still, we could also create an experience more fitted to what data engineers, data scientists, and data analysts typically do. So we went back to the “drawing board”, performed an in-depth analysis of the DAG writing process, and continued its simplification.

DAG writing today

  • It involves a decent understanding of Python and language of the target system (e.g. Spark or SQL)
  • It is full of Airflow constructs around operators, dependencies, etc. that are not related to the underlying data
  • Tasks are dependency-driven, but not on the underlying data they are moving

Next-generation DAG writing

  • It feels as close as possible to writing regular Python
  • People with no prior knowledge of Airflow have an effortless learning curve and understand what’s going on throughout the process
  • Tasks interact with each other in rich and meaningful ways

astro-image1

Astro – a New DAG-Writing Experience

Astro combines a couple of new significant features and improvements that give you proper control over the DAG-writing process and make it more intuitive. The idea behind the functionality is to help all users get onboarded into the experience quicker and with less effort.

Creating a Python + SQL Story

astro-image4

When it comes to a better Airflow SQL story, we wanted to handle the common cases of ELT – load data, transform tables, append and merge the existing tables.

Another important aspect was the promotion of best practices. Knowing that poorly formatted SQL can lead to SQL injections and cause a security issue, we made it as easy as possible to template your SQL in a way that’s safe. With hot swapping, the downtime is minimized as instead of merging into a production table, you can just merge into a temp table and swap them at the last second.

We also wanted to be still Pythonic – create a system where SQL tables feel like Python objects that can be passed between Python functions, allowing you to write out ELT pipeline without having to worry about a lot of the internals and what’s happening underneath.

The Transform Function

@aql.transform

astro-image3

With the AQL transform function, you only need to define the context at data load time to automatically go from SQL to Python and from Python to SQL.

@aql.transform
def aggregate_orders(orders_table: Table):
    return """SELECT customer_id, count(*) AS purchase_count 
FROM {orders_table}
        		WHERE 
purchase_date >= 
DATEADD(day, -7, '{{ execution_date }}')"""```

You import SQL as AQL from astro, write a Python function, and aggregate orders. Having an orders table and selecting the customer ID and count, you will be able to use both, astro templating and Jinja templating. 

@aql.transform def aggregate_orders(orders_table: Table): return """SELECT customer_id, count(*) AS purchase_count FROM {orders_table} WHERE purchase_date >= DATEADD(day, -7, ’{{ execution_date }}’)"""```

By defining the orders table as a type table in the function definition, we know that we can treat this as a table in the SQL statement. We are able to do all forms of security and templating checks before it actually goes to your SQL system. You can still use Jinja to make sure that all of the things that Airflow would provide, e.g. execution date, task ID, or DAG ID, are still available to you as you’re writing your DAG.

The Load Function

@aql.load_file

astro-image2

with dag:
    raw_orders = aql.load_file(
        path=“s3://my/s3/file.csv”, 
file_conn_id="my_s3_conn", 
output_table=Table(table_name="foo", conn_id="my_postgres_conn")
    )

This functionality allows you to simplify the process of loading data into SQL and pulling data out of SQL. For example, you load a CSV or JSON file from S3, GCs, or the Azure Azure storage. There you have the path to your S3 file or S3 bucket and the connection ID you would use to connect to it. Then you will see it outputs a type of a table. By defining the name and the connection ID, all the metadata you put into the table object is passed on between tasks. You don’t need to keep redefining it – you only need to define it once.

The Merge Function

@aql.merge

aql.merge(
    target_table="merge_test_1",
    merge_table="merge_test_2",
    merge_keys={"list": "list", "sell": "sell"},
    target_columns=["list", "sell"],
    merge_columns=["list", "sell"],
    conn_id="snowflake_conn",
    database="DWH_LEGACY",
    schema="SANDBOX_DANIEL",
    conflict_strategy="ignore",
)

The merge option makes it possible for you to switch between SQL flavors like PostgreSQL, Snowflake, or MySQL without the need to change your actual code. Even though they merge differently, you can switch from one to another with this function, and everything will still work on all of those systems.

Dataframe Integration

@df

from astro import dataframe as df

@df
def train_model(df: DataFrame):
    dfy = df.loc[:, "recent_purchase"]
    dfx = df.drop(columns=["customer_id", "recent_purchase"])
    dfx_train, dfx_test, dfy_train, dfy_test = train_test_split(
        dfx, dfy, test_size=0.2, random_state=63
    )
    model = xgb.XGBClassifier(
        n_estimators=100,
        eval_metric="logloss",
    )
    model.fit(dfx_train, dfy_train)
    preds = model.predict(dfx_test)
   return model

You can offer this dataframe a decorator and if you pass an SQL table into this function at DAG writing time, you will automatically convert this table into a dataframe. Run your Python function and then return that dataframe as a dataframe or return it back to the SQL system as an SQL table – you don’t need to make that decision until the DAG is running.

Code Examples

The astro operator can be found on the Astronomer Registry

Would you like to learn more? Check out our Astro for ETL, Introduction to Airflow Decorators guides, or the Astro Repo: https://github.com/astro-projects/astro

Getting Apache Airflow Certified

Join the 1000s of other data engineers who have received the Astronomer Certification for Apache Airflow Fundamentals. This exam assesses an understanding of the basics of the Airflow architecture and the ability to create simple data pipelines for scheduling and monitoring tasks.

Keep Your Data Flowing with Astro

Get a demo that’s customized around your unique data orchestration workflows and pain-points.

Get Started