Skip to main content
Version: Airflow 3.x

Manage Apache Airflow® DAG notifications

When you're using a data orchestration tool, how do you know when something has gone wrong? Apache Airflow® users can check the Airflow UI to determine the status of their DAGs, but this is an inefficient way of managing errors systematically, especially if certain failures need to be addressed promptly or by multiple team members. Fortunately, Airflow has built-in notification mechanisms that can be leveraged to configure error notifications in a way that works for your organization.

In this guide, you'll learn the basics of Airflow notifications and how to set up common notification mechanisms including SMTP, pre-built, and custom notifiers. You'll also learn how to leverage Airflow alerting when using Astro.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Airflow notification types

Airflow has a few options for notifying you on the status of your DAGs and tasks:

  • SMTP Provider: In Airflow 3.0, the SMTP provider replaces the deprecated built-in email configuration for sending notifications. It allows Airflow to send emails via an external SMTP service by defining an email backend connection, making alerting more modular and extensible.
  • Airflow callbacks: Callback parameters (*_callback) exist both at the task and at the DAG level. You can pass any callable or Airflow notifier to these parameters, and Airflow will run them in the case of specific events, such as a task failure. Airflow callbacks offer a lot of flexibility to execute any code based on the state of a task or DAG. They are often used to define actions for specific instances of task failures or successes.
  • Airflow notifiers: Notifiers are custom classes for Airflow callbacks that can be easily reused and standardized. Provider packages can ship pre-built notifiers like the SlackNotifier. Notifiers can be provided to callback parameters to define which task or DAG state should cause them to be executed. A common use case for notifiers is standardizing actions for task failures across several Airflow instances.

Most notifications can be set at the level of both a DAG and a task. Setting a parameter within a DAG's default_args dictionary will apply it to all tasks in the DAG. You can see examples of this in the set DAG and task-level callbacks section.

tip

The OSS notification library Apprise contains modules to send notifications to many services. You can use Apprise with Airflow by installing the Apprise Airflow provider which contains the AppriseNotifier. See the Apprise Airflow provider documentation for more information and examples.

Choose a notification type

It's best practice to use pre-built solutions whenever possible. This approach makes your DAGs more robust by reducing custom code and standardizing notifications across different Airflow environments.

If you want to deliver alerts to email, use the SMTP Provider and the SmtpNotifier for other events such as successful task runs.

If a notifier class exists for your use case, you should always use these methods instead of a custom callback. See the Airflow documentation for an up-to-date list of available Notifiers and the Apprise wiki for a list of services the Apprise notifier can connect to.

A notifier can be provided to any callback parameter (*callback). Only use custom Airflow callbacks when no notifier is available for your use case.

tip

To execute custom code based on events happening anywhere in your Airflow environment, for example whenever any dataset is updated or any task instance fails, you can use Airflow listeners. See the Use a listener to send a Slack notification when a Dataset is updated tutorial for an example.

SMTP Provider

As stated above, the Email configurion from Airflow 2.x has been deprecated in 3.0, and will be removed in Airflow 4.x. The new preferred method for email notifications is the SMTP provider with the SMTPNofier class. You can find documentation on the SMTP Provider here.

First, create a connection to your SMTP server in the Airflow UI (Admin → Connections) with the following settings:

  • Conn ID: smtp_default

  • Conn Type: SMTP

  • Host: e.g., smtp.gmail.com

  • Login: your SMTP username

  • Password: your SMTP password or app-specific password

  • Port: e.g., 587

info

It is not recommended to use any other Conn ID other than smtp_default as this is hardcoded as the conn ID for this notification type, and automatically passed in. You can create a custom ID, but that would require you to create a custom notification function to use it.

You can then easily call the SMTP provider from your dag:

from airflow.providers.smtp.notifications.smtp import SmtpNotifier

@dag(
default_args={
"on_failure_callback": SmtpNotifier(to="airflow@airflow.com"),
"on_retry_callback": SmtpNotifier(to="airflow@airflow.com"),
},
)

Airflow callbacks

In Airflow you can define actions to be taken due to different DAG or task states using *_callback parameters:

  • on_success_callback: Invoked when a task or DAG succeeds.
  • on_failure_callback: Invoked when a task or DAG fails.
  • on_skipped_callback : Invoked when a task is skipped. Added in Airflow 2.9, this callback only exists at the task level, and is only invoked when an AiflowSkipException is raised, not when a task is skipped due to other reasons, like a trigger rule. See Callback Types.
  • on_execute_callback: Invoked right before a task begins executing. This callback only exists at the task level.
  • on_retry_callback: Invoked when a task is retried. This callback only exists at the task level.
  • sla_miss_callback: Invoked when a task or DAG misses its defined Service Level Agreement (SLA). This callback is defined at the DAG level for DAGs with defined SLAs and will be applied to every task.

You can provide any Python callable to the *_callback parameters or Airflow notifiers. To execute multiple functions, you can provide several callback items to the same callback parameter in a list.

Notifiers

Airflow notifiers are pre-built or custom classes and can be used to standardize and modularize the functions you use to send notifications. Notifiers can be passed to the relevant *_callback parameter of your DAG depending on what event you want to trigger the notification.

info

You can find a full list of all pre-built notifiers created for Airflow providers here and connect to many more services through the AppriseNotifier.

Notifiers are defined in provider packages or imported from the include folder and can be used across any of your DAGs. This feature has the advantage that community members can define and share functionality previously used in callback functions as Airflow modules, creating pre-built callbacks to send notifications to other data tools.

An Airflow notifier can be created by inheriting from the BaseNotifier class and defining the action which should be taken in case the notifier is used in the .notify() method.

class MyNotifier(BaseNotifier):
"""
Basic notifier, prints the task_id, state and a message.
"""

template_fields = ("message",)

def __init__(self, message):
self.message = message

def notify(self, context):
t_id = context["ti"].task_id
t_state = context["ti"].state
print(
f"Hi from MyNotifier! {t_id} finished as: {t_state} and says {self.message}"
)

To use the custom notifier in a DAG, provide its instantiation to any callback parameter. For example:

@task(
on_failure_callback=MyNotifier(message="Hello failed!"),
)
def t1():
return "hello"

Example pre-built notifier: Slack

An example of a community provided pre-built notifier is the SlackNotifier.

It can be imported from the Slack provider package and used with any *_callback function:

"""
Example showing how to use the SlackNotifier. Needs a Slack connection set
up with Slack API Token for a Slack bot (starts with 'xoxb-...')
"""

from airflow.decorators import dag, task
from pendulum import datetime
from airflow.providers.slack.notifications.slack_notifier import SlackNotifier

SLACK_CONNECTION_ID = "slack_conn"
SLACK_CHANNEL = "alerts"
SLACK_MESSAGE = """
Hello! The {{ ti.task_id }} task is saying hi :wave:
Today is the {{ ds }} and this task finished with the state: {{ ti.state }} :tada:.
"""


@dag(
start_date=datetime(2023, 4, 18),
schedule=None,
catchup=False,
tags=["Notifier", "Slack"],
)
def slack_notifier_example_dag():
@task(
on_success_callback=SlackNotifier(
slack_conn_id=SLACK_CONNECTION_ID,
text=SLACK_MESSAGE,
channel=SLACK_CHANNEL,
),
)
def post_to_slack():
return 10

post_to_slack()


slack_notifier_example_dag()

The DAG above has one task sending a notification to Slack. It uses a Slack Airflow connection with the connection ID slack_conn.

Slack notification

Set DAG and task-level custom callbacks

To define a custom notification at the DAG level, you can set the *_callback parameters in your DAG instantiation. DAG-level notifications will trigger callback functions based on the state of the entire DAG run.

def my_success_callback_function(context):
pass


def my_failure_callback_function(context):
pass


def my_sla_callback_function(context):
pass


@dag(
start_date=datetime(2023,4,25),
schedule="@daily",
catchup=False,
on_success_callback=my_success_callback_function,
on_failure_callback=my_failure_callback_function,
sla_miss_callback=my_sla_callback_function
)

To apply a task-level callback to each task in your DAG, you can pass the callback function to the default_args parameter. Items listed in the dictionary provided to the default_args parameter will be set for each task in the DAG.

def my_execute_callback_function(context):
pass


def my_retry_callback_function(context):
pass


def my_success_callback_function(context):
pass


def my_failure_callback_function(context):
pass

def my_skipped_callback_function(context):
pass


@dag(
start_date=datetime(2023,4,25),
schedule="@daily",
catchup=False,
default_args={
"on_execute_callback": my_execute_callback_function,
"on_retry_callback": my_retry_callback_function,
"on_success_callback": my_success_callback_function,
"on_failure_callback": my_failure_callback_function,
"on_skipped_callback": my_skipped_callback_function,
}
)

For use cases where an individual task should use a specific callback, the task-level callback parameters can be defined in the task instantiation. Callbacks defined at the individual task level will override callbacks passed in via default_args.

def my_execute_callback_function(context):
pass


def my_retry_callback_function(context):
pass


def my_success_callback_function(context):
pass


def my_failure_callback_function(context):
pass


def my_skipped_callback_function(context):
pass


@task(
on_execute_callback=my_execute_callback_function,
on_retry_callback=my_retry_callback_function,
on_success_callback=my_success_callback_function,
on_failure_callback=my_failure_callback_function,
on_skipped_callback=my_skipped_callback_function,
)
def t1():
return "hello"

Astronomer notifications

Airflow's built-in notification mechanisms are great for common use cases, but they have some limitations. For the cases where Airflow notifications aren't sufficient, Astro alerts provide an additional level of observability. For guidance on when to choose Airflow notifications or Astro alerts, see When to use Airflow or Astro alerts for your pipelines on Astro.

Was this page helpful?