Manage Apache Airflow® Dag notifications

When you’re using a data orchestration tool, how do you know when something has gone wrong? Apache Airflow® users can check the Airflow UI to determine the status of their Dags, but this is an inefficient way of managing errors systematically, especially if certain failures need to be addressed promptly or by multiple team members. Fortunately, Airflow has several notification mechanisms that can be used to configure error notifications in a way that works for your organization.

In this guide, you’ll learn how to set up common Airflow notification mechanisms including email (SMTP) notifications, Airflow callbacks and notifiers.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Notification types

When setting up Airflow notifications, you must first decide between using Airflow’s built-in notification system, an external monitoring service, or a combination of both. The three types of notifications available when running Airflow on Astro are:

The advantage of Airflow notifications is that you can define them directly in your Dag code. The downside is that you need Airflow to be running to send notifications, which means you might run into silent failures if there is an issue with your Airflow infrastructure. Airflow notifications also have some limitations, for example relating to defining SLAs and timeouts.

For the cases where Airflow notifications aren’t sufficient, Astro alerts and Astro Observe provide an additional level of observability. For guidance on when to choose Airflow notifications or Astro alerts, see When to use Airflow or Astro alerts for your pipelines on Astro.

Airflow notification concepts

When defining notifications in Airflow you should understand the following concepts:

Choose your Airflow notification method

It’s best practice to use pre-built solutions whenever possible. This approach makes your Dags more robust by reducing custom code and standardizing notifications across different Airflow environments.

If you want to deliver notifications to email, use the SmtpNotifier or EmailOperator. If you want to use another email service like SendGrid or Amazon SES, see the Airflow documentation for more information.

If you want to be notified via another system, check if a notifier class exists for your use case. See the Airflow documentation for an up-to-date list of available Notifiers and the Apprise wiki for a list of services the AppriseNotifier can connect to.

Only use custom callback functions when no notifier is available for your use case. Consider writing a custom notifier to standardize the code you use to send notifications.

If you want to execute code based on events happening anywhere in your Airflow environment, for example whenever any asset is updated, a Dag run fails, or a new import error is detected, you can use Airflow listeners.

Email (SMTP) notifications

Airflow email notifications can be set up in three different ways:

  • (Recommended) You can provide the SmtpNotifier with any callback parameter to send emails when a Dag or task reaches a specific state.
  • (Recommended) You can use the EmailOperator to create dedicated tasks in your Dags to send emails.
  • (Legacy) You can configure email notifications using the email task parameter in combination with Airflow configuration variables in the SMTP section. This approach has limitations and will be removed in a future version. See (Legacy) Email notifications using configuration variables.

All email notifications require you to install the SMTP provider by adding it to your requirements.txt file.

apache-airflow-providers-smtp

Using the SmtpNotifier

The SmtpNotifier is a pre-built notifier that can be provided to any callback parameter to send emails when a Dag or task reaches a specific state.

To connect the notifier to your SMTP server, you need to create an Airflow connection, for example by setting the following environment variable to create the smtp_default connection:

AIRFLOW_CONN_SMTP_DEFAULT='{
"conn_type":"smtp",
"host":"smtp.yourdomain.com",
"port":<your-port>,
"login":"<your-username>",
"password":"<your-password>",
"extra":{
"disable_ssl":<your-setting>,
"disable_tls":<your-setting>
}
}'

The main parameters to configure for the SmtpNotifier are:

  • smtp_conn_id: The ID of the Airflow connection to your SMTP server. Default: smtp_default.
  • to: The email address to send the email to. You can provide a single email address as a string or multiple in a list. Default: None. This parameter is required.
  • cc: The email address to send the email to as a carbon copy. You can provide a single email address as a string or multiple in a list. Default: None.
  • bcc: The email address to send the email to as a blind carbon copy. You can provide a single email address as a string or multiple in a list. Default: None.
  • from_email: The email address to send the email from. Default: None.
  • subject: The subject of the email. Default: None.
  • html_content: The HTML content of the email. Default: None.
  • files: The files to attach to the email as a list of file paths. Default: None.
  • custom_headers: A dictionary of custom headers to add to the email. Default: None.

You provide the instantiated notifier class directly to any callback parameter to send emails when that callback is triggered. To add information about the Dag run to the email, use Jinja templating. All parameters listed above other than smtp_conn_id are templatable.

For example, to send an email notification when a task fails that includes information about the task, as well as the error message ({{ exception }}) and a link to the task’s log ({{ ti.log_url }}), you can use the SmtpNotifier as shown in the code example below.

1@task(
2 on_failure_callback=SmtpNotifier(
3 from_email="testnotifier@test.com",
4 to=["primary@test.com"],
5 cc=["manager@test.com", "team-lead@test.com"],
6 bcc=["audit@test.com", "monitoring@test.com"],
7 subject="{{ ti.task_id }} failed in {{ dag.dag_id }}",
8 html_content="""
9 <html>
10 <body>
11 <h2 style="color: red;">Task Failure Alert</h2>
12 <p><strong>Task:</strong> {{ ti.task_id }}</p>
13 <p><strong>DAG:</strong> {{ dag.dag_id }}</p>
14 <p><strong>Execution Date:</strong> {{ ts }}</p>
15 <p><strong>Log URL:</strong> {{ ti.log_url }}</p>
16 <hr>
17 <h3>Error Details:</h3>
18 <pre>{{ exception }}</pre>
19 </body>
20 </html>
21 """,
22 files=["include/debug_info.json"],
23 custom_headers={
24 "X-Priority": "1",
25 "X-Airflow-DAG": "{{ dag.dag_id }}",
26 "X-Airflow-Task": "{{ ti.task_id }}",
27 "Reply-To": "airflow-support@test.com"
28 }
29 )
30)
31def test_notifier_advanced():
32 raise Exception("Oops, too much vibe coding!")

The resulting email looks like this:

Example email notification using the SmtpNotifier

If you’d like to test email formatting locally without connecting to a real SMTP server, you can use MailHog in a local Docker container to catch emails and view them in a web interface. Start the MailHog server using docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog and use the following connection string:

AIRFLOW_CONN_SMTP_DEFAULT='{
"conn_type":"smtp",
"host":"localhost",
"port":1025,
"login":"",
"password":"",
"extra":{
"disable_ssl":true,
"disable_tls":true
}
}'

Using the EmailOperator

You can use the EmailOperator to create dedicated tasks in your Dags to send emails. As with the SmtpNotifier, you need to have the SMTP provider installed and create an Airflow connection to your SMTP server. The main parameters are analogous to the SmtpNotifier.

1from airflow.providers.smtp.operators.smtp import EmailOperator
2
3EmailOperator(
4 task_id="send_email",
5 conn_id="smtp_default",
6 from_email="caller@mydomain.io",
7 to="receiver@mydomain.io",
8 subject="Test Email",
9 html_content="This is a test email"
10)

(Legacy) Email notifications using configuration variables

In older Airflow versions it was common to configure email notifications using a mix of configuration variables and task parameters. This approach is being deprecated in Airflow 3.0 and will be removed in a future version.

To configure email notifications using configuration variables, both the SMTP configuration variables and the email task parameter are needed. Note that you cannot use an AIRFLOW_CONN_ connection with the email configuration parameters in Airflow 3.

The SMTP configuration variables define the connection to your SMTP server.

AIRFLOW__SMTP__SMTP_HOST=<your-smtp-host>
AIRFLOW__SMTP__SMTP_PORT=<your-port>
AIRFLOW__SMTP__SMTP_USER=<your-username>
AIRFLOW__SMTP__SMTP_PASSWORD=<your-password>
AIRFLOW__SMTP__SMTP_SSL=<your-setting>
AIRFLOW__SMTP__SMTP_TLS=<your-setting>
AIRFLOW__SMTP__SMTP_MAIL_FROM=<your-from-email>

In order for a task to be able to send an email if it fails or retries, you need to provide the email task parameter to the task to specify who to send the email to.

It is common to provide this in the default_args parameter of a Dag to apply it to all tasks in the Dag.

1from airflow.sdk import dag
2
3@dag(
4 default_args={
5 "email": ["myname@mydomain.com"],
6 }
7)
8def my_dag():

But you can also provide it at the task level to override the default.

1@task(email=["myfriend@mydomain.com"])
2def test():
3 print("Test")
4 raise Exception("Test Exception")

If you want a task to only send emails when it fails, set the email_on_retry parameter to False, if you want it to only send emails when it retries, set the email_on_failure parameter to False.

Most of the AIRFLOW__EMAIL__ configuration variables are no longer supported in Airflow 3.0 for SMTP-based email notifications. Some of those parameters are still used when utilizing other email notification methods such as SendGrid or Amazon SES, see Email Configuration in the Airflow documentation for more information.

Airflow callbacks

In Airflow you can define actions to be taken based on different Dag or task states using *_callback parameters:

  • on_success_callback: Invoked when a task or Dag succeeds.
  • on_failure_callback: Invoked when a task or Dag fails.
  • on_skipped_callback : Invoked when a task is skipped. This callback only exists at the task level, and is only invoked when an AirflowSkipException is raised, not when a task is skipped due to other reasons, like a trigger rule.
  • on_execute_callback: Invoked right before a task begins executing. This callback only exists at the task level.
  • on_retry_callback: Invoked when a task is retried. This callback only exists at the task level.

You can provide any Python callable or Airflow notifiers to the *_callback parameters. To execute multiple functions, you can provide several callback items to the same callback parameter in a list.

Setting Dag-level callbacks

To define a notification at the Dag level, you can set the *_callback parameter in your Dag instantiation. Dag-level notifications will trigger callback functions based on the terminal state of the entire Dag run. The example below shows one function being executed when the Dag succeeds and two functions being executed when the Dag fails (one custom function and one SlackNotifier).

1from airflow.sdk import dag
2from airflow.providers.slack.notifications.slack_notifier import SlackNotifier
3
4def my_success_callback_function(context):
5 pass
6
7def my_failure_callback_function(context):
8 pass
9
10@dag(
11 on_success_callback=my_success_callback_function,
12 on_failure_callback=[
13 my_failure_callback_function,
14 SlackNotifier(
15 slack_conn_id="slack_conn",
16 text="Dag failed",
17 channel="alerts"
18 )
19 ],
20)

In Airflow 3.1, deadline alerts that are executed when a Dag run exceeds a user-defined time threshold were added as an experimental feature to replace the removed SLA feature used with the sla and sla_miss_callback parameters. See Deadline alerts in the Airflow documentation for more information.

Astronomer customers should use Astro alerts and Astro Observe to define timeliness and freshness SLAs.

Setting task-level callbacks

To apply a task-level callback to each task in your Dag, you can pass the callback function to the default_args parameter. Items listed in the dictionary provided to the default_args parameter will be set for each task in the Dag. While the example shows one callback function being assigned to each callback parameter, you can provide multiple callback functions and/or notifiers to the same callback parameter in a list as well.

1from airflow.sdk import dag
2
3def my_execute_callback_function(context):
4 pass
5
6def my_retry_callback_function(context):
7 pass
8
9def my_success_callback_function(context):
10 pass
11
12def my_failure_callback_function(context):
13 pass
14
15def my_skipped_callback_function(context):
16 pass
17
18@dag(
19 default_args={
20 "on_execute_callback": my_execute_callback_function,
21 "on_retry_callback": my_retry_callback_function,
22 "on_success_callback": my_success_callback_function,
23 "on_failure_callback": my_failure_callback_function,
24 "on_skipped_callback": my_skipped_callback_function,
25 }
26)

For use cases where an individual task should use a specific callback, the task-level callback parameters can be defined in the task instantiation. Callbacks defined at the individual task level will override callbacks passed in via default_args.

Taskflow
1from airflow.sdk import task
2
3def my_execute_callback_function(context):
4 pass
5
6def my_retry_callback_function(context):
7 pass
8
9def my_success_callback_function(context):
10 pass
11
12def my_failure_callback_function(context):
13 pass
14
15def my_skipped_callback_function(context):
16 pass
17
18@task(
19 on_execute_callback=my_execute_callback_function,
20 on_retry_callback=my_retry_callback_function,
21 on_success_callback=my_success_callback_function,
22 on_failure_callback=my_failure_callback_function,
23 on_skipped_callback=my_skipped_callback_function,
24)
25def t1():
26 return "hello"
Traditional
1from airflow.providers.standard.operators.python import PythonOperator
2
3def my_execute_callback_function(context):
4 pass
5
6def my_retry_callback_function(context):
7 pass
8
9def my_success_callback_function(context):
10 pass
11
12def my_failure_callback_function(context):
13 pass
14
15def my_skipped_callback_function(context):
16 pass
17
18def say_hello():
19 return "hello"
20
21t1 = PythonOperator(
22 task_id="t1",
23 python_callable=say_hello,
24 on_execute_callback=my_execute_callback_function,
25 on_retry_callback=my_retry_callback_function,
26 on_success_callback=my_success_callback_function,
27 on_failure_callback=my_failure_callback_function,
28 on_skipped_callback=my_skipped_callback_function,
29)

Pre-built notifiers

Airflow notifiers are pre-built or custom classes and can be used to standardize and modularize the functions you use to send notifications. Notifiers can be passed to the relevant *_callback parameter of your Dag depending on what event you want to trigger the notification.

You can find a full list of all pre-built notifiers created for Airflow providers here and connect to many more services through the AppriseNotifier.

Notifiers are defined in provider packages or imported from the include folder and can be used across any of your Dags. This feature has the advantage that community members can define and share functionality previously used in callback functions as Airflow modules, creating pre-built callbacks to send notifications to other data tools.

Example pre-built notifier: Slack

An example of a community provided pre-built notifier is the SlackNotifier.

It can be imported from the Slack provider package and used with any *_callback function:

1"""
2Example showing how to use the SlackNotifier. Needs a Slack connection set
3up with Slack API Token for a Slack bot (starts with 'xoxb-...')
4"""
5
6from airflow.sdk import dag, task
7from pendulum import datetime
8from airflow.providers.slack.notifications.slack_notifier import SlackNotifier
9
10SLACK_CONNECTION_ID = "slack_conn"
11SLACK_CHANNEL = "alerts"
12SLACK_MESSAGE = """
13Hello! The {{ ti.task_id }} task is saying hi :wave:
14Today is the {{ ds }} and this task finished with the state: {{ ti.state }} :tada:.
15"""
16
17
18@dag
19def slack_notifier_example_dag():
20 @task(
21 on_success_callback=SlackNotifier(
22 slack_conn_id=SLACK_CONNECTION_ID,
23 text=SLACK_MESSAGE,
24 channel=SLACK_CHANNEL,
25 ),
26 )
27 def post_to_slack():
28 return 10
29
30 post_to_slack()
31
32
33slack_notifier_example_dag()

The Dag above has one task sending a notification to Slack. It uses a Slack Airflow connection with the connection ID slack_conn.

Slack notification

Custom notifiers

If no notifier exists for your use case you can write your own! An Airflow notifier can be created by inheriting from the BaseNotifier class and defining the action which should be taken in case the notifier is used in the .notify() method.

1from airflow.sdk import BaseNotifier
2
3class MyNotifier(BaseNotifier):
4 """
5 Basic notifier, prints the task_id, state and a message.
6 """
7
8 template_fields = ("message",)
9
10 def __init__(self, message):
11 self.message = message
12
13 def notify(self, context):
14 t_id = context["ti"].task_id
15 t_state = context["ti"].state
16 print(
17 f"Hi from MyNotifier! {t_id} finished as: {t_state} and says {self.message}"
18 )

To use the custom notifier in a Dag, provide its instantiation to any callback parameter. For example:

Taskflow
1from airflow.sdk import task
2
3def say_hello():
4 return "hello"
5
6@task(
7 on_failure_callback=MyNotifier(message="Hello failed!"),
8)
9def t1():
10 return "hello"
Traditional
1from airflow.providers.standard.operators.python import PythonOperator
2
3def say_hello():
4 return "hello"
5
6t1 = PythonOperator(
7 task_id="t1",
8 python_callable=say_hello,
9 on_failure_callback=MyNotifier(message="Hello failed!"),
10)