>
WEBINARS

Everything you Need to Know About Airflow 2.2

Watch Video On Demand

Hosted By

  • Ash Berlin-Taylor

Presented by Ash Berlin-Taylor, Director of Airflow Engineering at Astronomer

Airflow 2.2.0 is finally almost here! The new release combines two new big features and a whole lot of small quality of life improvements to make Airflow even more powerful, and also fixes some long-standing complaints.

Airflow 2.2.0 new features

everything-you-need-to-know-about-airflow-2-2-image2

AIP stands for Airflow Improvement Proposal. Any kind of big architectural change or a fundamental change to the way Airflow operates goes through the Airflow improvement proposal process and a vote. It ensures that the big fundamental changes get by in front of the community.

1. AIP-39: Custom Timetables

everything-you-need-to-know-about-airflow-2-2-image5

You can also add your own timetable! Example timetable:

class RunAtTimetable(Timetable):
   def __init__(self, cron: str, timezone: Timezone) -> None:
       self._expression = cron_presets.get(cron, cron)
       self._timezone = timezone

   def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
       return DataInterval(None, None)

   def next_dagrun_info(self, *,
                        last_automated_data_interval: DataInterval | None,
                        restriction: TimeRestriction) -> DagRunInfo | None:
       # TODO: handle restriction.latest and restriction.catchup
       when = last_automated_data_interval or restriction.earliest
       cron = croniter(self._expression, start_time=when)
       scheduled = cron.get_next(datetime.datetime)

       return make_aware(scheduled.in_timezone(self._timezone))
from astronomer.timetables.trading_hours \
    import USTradingHoursTimetable

    with DAG(timetable=USTradingHoursTimetable()):
       @task.python
       def fetch_daily_trades():

2. AIP-40: Deferrable Tasks

Allows tasks or sensors to free up worker resources when waiting for external systems/events.

everything-you-need-to-know-about-airflow-2-2-image1

everything-you-need-to-know-about-airflow-2-2-image3

everything-you-need-to-know-about-airflow-2-2-image4

3. @task.docker decorator

@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
   total_order_value = 0

   for value in order_data_dict.values():
       total_order_value += value

   return {"total_order_value": total_order_value}

4. Other features

The building blocks for true parameterized DAGs

with DAG(
  'my_dag',
  params: {
    # a int param with default value
    'int_param': Param(10, type='integer', minimum=0, maximum=20),
    # a mandatory str param
    'str_param': Param(type='string', minLength=2, maxLength=4),  
    # a param which can be None as well
    'dummy_param': Param(type=['null', 'number', 'string']),
    # i.e. no data or type validations
    'old_param': 'old_way_of_passing',
    # i.e. no data or type validations
    'simple_param': Param('im_just_like_old_param'),
    'email_param': Param(
        default='example@example.com',
        type='string',
        format='idn-email',
        minLength=5,
        maxLength=255,
    ),
  }
):

Thank you for your attention and see you on the day of the release!

Ready to Get Started?

Get Started Free

Try Astro free for 14 days and power your next big data project.