WEBINAR

Everything you Need to Know About Airflow 2.2

Recorded On October 5, 2021

  • Ash Berlin-Taylor

Presented by Ash Berlin-Taylor, Director of Airflow Engineering at Astronomer

Airflow 2.2.0 is finally almost here! The new release combines two new big features and a whole lot of small quality of life improvements to make Airflow even more powerful, and also fixes some long-standing complaints.

Airflow 2.2.0 new features

everything-you-need-to-know-about-airflow-2-2-image2

AIP stands for Airflow Improvement Proposal. Any kind of big architectural change or a fundamental change to the way Airflow operates goes through the Airflow improvement proposal process and a vote. It ensures that the big fundamental changes get by in front of the community.

1. AIP-39: Custom Timetables

  • Schedule where you couldn’t go before!
    • Cron expressions only got us as far as regular time intervals
    • For example, daily Monday-Friday (but not weekend) wasn’t possible.
    • Full back-compatibility maintained, schedule_interval is not going away
    • Timetables also introduce explicit “data interval” - super useful when looking at a given data for a specific period of time
    • Now possible to draw Friday data on Saturday or any other funky interval.

everything-you-need-to-know-about-airflow-2-2-image5

  • No more “why didn’t my dag run yet?”

    • The concept of “execution_date” was confusing to every new user, so now it is deprecated! In its place there is:
      • logical_date (aka execution_date)
      • data_interval_start (same value as execution_date for built in)
      • data_interval_end (same value asnext_execution_date, at least for the built-in Timetables)
  • Pluggable timetables! Airflow 2.2 ships with a few built-in timetables that mirror the behavior of schedule_interval.

You can also add your own timetable! Example timetable:

class RunAtTimetable(Timetable):
   def __init__(self, cron: str, timezone: Timezone) -> None:
       self._expression = cron_presets.get(cron, cron)
       self._timezone = timezone

   def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
       return DataInterval(None, None)

   def next_dagrun_info(self, *,
                        last_automated_data_interval: DataInterval | None,
                        restriction: TimeRestriction) -> DagRunInfo | None:
       # TODO: handle restriction.latest and restriction.catchup
       when = last_automated_data_interval or restriction.earliest
       cron = croniter(self._expression, start_time=when)
       scheduled = cron.get_next(datetime.datetime)

       return make_aware(scheduled.in_timezone(self._timezone))
  • Limitations

    • Should return same result every time it’s called (no HTTP requests please - event triggering coming in future)
    • Timetables “evaluated” inside scheduler when creating DagRuns, to keep it fast and error-free
  • NYSE trading timetable (Astronomer customers only!)

from astronomer.timetables.trading_hours \
    import USTradingHoursTimetable

    with DAG(timetable=USTradingHoursTimetable()):
       @task.python
       def fetch_daily_trades():

2. AIP-40: Deferrable Tasks

Allows tasks or sensors to free up worker resources when waiting for external systems/events.

everything-you-need-to-know-about-airflow-2-2-image1

  • Ideal use case: submit then poll operators

    • Airbnb introduced smart sensors, a first tackle of this issue
    • Deferrable task is a great for anything that submits a job to external system then polls for status (not just sensors)
    • Does not consume a worker slot while in deferral mode - instead, runs hundreds at once in an async process
    • Uses fewer resources, and is more reliable
    • Doesn’t need a DAG running
  • Advantages of async

everything-you-need-to-know-about-airflow-2-2-image3

  • New component!

everything-you-need-to-know-about-airflow-2-2-image4

  • Async operators:
    • DateTimeSensorAsync

    • TimeDeltaSensorAsync

    • Astronomer customers only:

      • DatabricksRunNowOperatorAsync
      • DatabricksSubmitRunOperatorAsync
      • HttpSensorAsync
      • ExternalTaskSensorAsync

3. @task.docker decorator

@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
   total_order_value = 0

   for value in order_data_dict.values():
       total_order_value += value

   return {"total_order_value": total_order_value}

4. Other features

  • Validation of DAG params

The building blocks for true parameterized DAGs

with DAG(
  'my_dag',
  params: {
    # a int param with default value
    'int_param': Param(10, type='integer', minimum=0, maximum=20),
    # a mandatory str param
    'str_param': Param(type='string', minLength=2, maxLength=4),  
    # a param which can be None as well
    'dummy_param': Param(type=['null', 'number', 'string']),
    # i.e. no data or type validations
    'old_param': 'old_way_of_passing',
    # i.e. no data or type validations
    'simple_param': Param('im_just_like_old_param'),
    'email_param': Param(
        default='example@example.com',
        type='string',
        format='idn-email',
        minLength=5,
        maxLength=255,
    ),
  }
):
  • Airflow standalone Run all the airflow components (migrations, scheduler, webserver, triggerer, etc) directly without a docker.

Thank you for your attention and see you on the day of the release!

See More Resources

Learn from the experts: how to write data pipelines with Airflow

Astro 101 Webinar: An Introduction to fully-managed Airflow

Datasets and Data-Aware Scheduling in Airflow

Deliver Data with Precision: Supercharge your Airflows with Apache Airflow® on Astro - An Azure Native ISV Service

Try Astro for Free for 14 Days

Sign up with your business email and get up to $500 in free credits.

Get Started

Build, run, & observe your data workflows. All in one place.

Build, run, & observe
your data workflows.
All in one place.

Try Astro today and get up to $500 in free credits during your 14-day trial.