Webinar Recap

Everything you Need to Know About Airflow 2.2

Presented by Ash Berlin-Taylor, Director of Airflow Engineering at Astronomer

Airflow 2.2.0 is finally almost here! The new release combines two new big features and a whole lot of small quality of life improvements to make Airflow even more powerful, and also fixes some long-standing complaints.

Airflow 2.2.0 new features

everything-you-need-to-know-about-airflow-2-2-image2

AIP stands for Airflow Improvement Proposal. Any kind of big architectural change or a fundamental change to the way Airflow operates goes through the Airflow improvement proposal process and a vote. It ensures that the big fundamental changes get by in front of the community.

1. AIP-39: Custom Timetables

  • Schedule where you couldn’t go before!

    • Cron expressions only got us as far as regular time intervals
    • For example, daily Monday-Friday (but not weekend) wasn't possible.
    • Full back-compatibility maintained, schedule_interval is not going away
    • Timetables also introduce explicit "data interval" - super useful when looking at a given data for a specific period of time
    • Now possible to draw Friday data on Saturday or any other funky interval.

everything-you-need-to-know-about-airflow-2-2-image5

  • No more "why didn't my dag run yet?"

    • The concept of "execution_date" was confusing to every new user, so now it is deprecated! In its place there is:
    • logical_date (aka execution_date)
    • data_interval_start (same value as execution_date for built in)
    • data_interval_end (same value asnextexecutiondate, at least for the built-in Timetables)
  • Pluggable timetables! Airflow 2.2 ships with a few built-in timetables that mirror the behavior of schedule_interval.

You can also add your own timetable! Example timetable:

class RunAtTimetable(Timetable):
   def __init__(self, cron: str, timezone: Timezone) -> None:
       self._expression = cron_presets.get(cron, cron)
       self._timezone = timezone

   def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
       return DataInterval(None, None)

   def next_dagrun_info(self, *,
                        last_automated_data_interval: DataInterval | None,
                        restriction: TimeRestriction) -> DagRunInfo | None:
       # TODO: handle restriction.latest and restriction.catchup
       when = last_automated_data_interval or restriction.earliest
       cron = croniter(self._expression, start_time=when)
       scheduled = cron.get_next(datetime.datetime)

       return make_aware(scheduled.in_timezone(self._timezone))
  • Limitations

    • Should return same result every time it's called (no HTTP requests please - event triggering coming in future)
    • Timetables "evaluated" inside scheduler when creating DagRuns, to keep it fast and error-free
  • NYSE trading timetable (Astronomer customers only!)

    from astronomer.timetables.trading_hours \
    import USTradingHoursTimetable
    
    with DAG(timetable=USTradingHoursTimetable()):
       @task.python
       def fetch_daily_trades():

2. AIP-40: Deferrable Tasks

Allows tasks or sensors to free up worker resources when waiting for external systems/events.

everything-you-need-to-know-about-airflow-2-2-image1

  • Ideal use case: submit then poll operators

    • Airbnb introduced smart sensors, a first tackle of this issue
    • Deferrable task is a great for anything that submits a job to external system then polls for status (not just sensors)
    • Does not consume a worker slot while in deferral mode - instead, runs hundreds at once in an async process
    • Uses fewer resources, and is more reliable
    • Doesn’t need a DAG running
  • Advantages of async

everything-you-need-to-know-about-airflow-2-2-image3

  • New component!

everything-you-need-to-know-about-airflow-2-2-image4

  • Async operators:

    • DateTimeSensorAsync
    • TimeDeltaSensorAsync
    • Astronomer customers only:
    • DatabricksRunNowOperatorAsync
    • DatabricksSubmitRunOperatorAsync
    • HttpSensorAsync
    • ExternalTaskSensorAsync

3. @task.docker decorator

@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
   total_order_value = 0

   for value in order_data_dict.values():
       total_order_value += value

   return {"total_order_value": total_order_value}

4. Other features

  • Validation of DAG params

The building blocks for true parameterized DAGs

with DAG(
  'my_dag',
  params: {
    # a int param with default value
    'int_param': Param(10, type='integer', minimum=0, maximum=20),
    # a mandatory str param
    'str_param': Param(type='string', minLength=2, maxLength=4),  
    # a param which can be None as well
    'dummy_param': Param(type=['null', 'number', 'string']),
    # i.e. no data or type validations
    'old_param': 'old_way_of_passing',
    # i.e. no data or type validations
    'simple_param': Param('im_just_like_old_param'),
    'email_param': Param(
        default='example@example.com',
        type='string',
        format='idn-email',
        minLength=5,
        maxLength=255,
    ),
  }
):
  • Airflow standalone Run all the airflow components (migrations, scheduler, webserver, triggerer, etc) directly without a docker.

Thank you for your attention and see you on the day of the release!

Getting Apache Airflow Certified

Join the 1000s of other data engineers who have received the Astronomer Certification for Apache Airflow Fundamentals. This exam assesses an understanding of the basics of the Airflow architecture and the ability to create simple data pipelines for scheduling and monitoring tasks.