Airflow plugins

Plugins are external features that can be added to customize your Airflow installation. They are automatically imported upon starting your Airflow instance if they have been added to plugins folder of an Airflow project.

In this guide, you’ll learn how to add a plugin to your Airflow instance and what Airflow components can be part of a plugin.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

When to use plugins

Plugins offer a flexible way to customize your Airflow experience by building on top of existing Airflow components. For example, you can:

  • Add views to your Airflow UI that display contents of the Airflow metadata database in a custom way.
  • Build an application that monitors Airflow’s functioning and sends out custom alerts, for example in case of a certain number of DAGs failing in a specified time frame.
  • Add a button to the task instance Details view that dynamically links to files or logs in external data tools relevant to the task.

How to create a plugin

To add a new plugin to your Airflow instance, you need to create a Python file in the plugins folder of your Airflow project. Within that file, create a class which inherits from the AirflowPlugin to define the plugin. The code snippet below defines a plugin with the name empty without any components.

1from airflow.plugins_manager import AirflowPlugin
2
3class MyAirflowPlugin(AirflowPlugin):
4 # name your plugin, this is mandatory
5 name = "empty"
6
7 ## Add plugin components
8 # ...
9 # ...
10 # ...
11
12 # Add an optional callback to perform actions when airflow starts and
13 # the plugin is loaded.
14 # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
15 # to protect against extra parameters injected into the on_load()
16 # function in future changes
17 def on_load(*args, **kwargs):
18 # perform Plugin boot actions
19 pass

The list of currently active plugins can be viewed in the Airflow UI under Admin -> Plugins. The code above creates the following entry:

Empty plugin

In order for changes to your plugin to be registered, you will need to restart any Airflow components (e.g. the webserver or scheduler) that use the plugin. Learn more in the official Airflow documentation on plugins.

Plugin components

Functionality is added to a plugin by adding components to the class which defines the plugin. There are 10 types of plugin components that can be added to Airflow. In this guide we will show the more commonly used components, including:

  • appbuilder_menu_items allow you to add additional sections and links to the Airflow menu.
  • flask_blueprints and appbuilder_views offer the possibility to build a Flask project on top of Airflow.
  • operator_extra_links and global_operator_extra_links are ways to add links to Airflow task instances.
  • macros expand upon existing Jinja templates using custom functions.
  • listeners define custom code to execute whenever certain events happen anywhere in your Airflow instance.

Other types of plugin components not covered in this guide include:

  • timetables offer the option to register custom timetables that define schedules which cannot be expressed in CRON. See the DAG Schedule DAGs in Airflow guide for more information and a code example.
  • executors add the possibility to use a custom executor in your Airflow instance.

Before Airflow 2.0 custom operators and hooks were added as plugins. This pattern has been deprecated and custom operators and hooks can now be used simply by importing a script located in include.

Appbuilder menu items

You can update the menu at the top of the Airflow UI to contain custom tabs with links to external websites. Adding top-level menu items and adding sub-items are both supported.

If you are an Astro user, in order for the menu items to be visible to all users, the top-level menu item must be set to “Custom Menu” or one of the existing top-level menus such as “Docs” in the example below. This is done by passing the appropriate top-level menu to the category and name parameters in the menu sub-item.

The following code creates a plugin that adds two menu items.

  • apache_mitem_subitem is a sub-item which is added to Custom Menu. It is labeled Apache and links to the Apache homepage.
  • cosmos_mitem_subitem is a sub-item which is added to the Docs menu. It is labeled Cosmos Docs and links out to the documentation for Cosmos.

Both additional menu items are added to the app_builder_menu_items component of a plugin called Menu items plugin which is defined in the MyMenuItemsPlugin class.

1from airflow.plugins_manager import AirflowPlugin
2
3# creating a new sub-item in the Custom Menu item
4apache_mitem_subitem = {
5 "label": "Apache",
6 "href": "https://www.apache.org/",
7 "category": "Custom Menu",
8 "name": "Custom Menu",
9}
10
11# creating a new sub-item in the Docs menu item
12cosmos_mitem_subitem = {
13 "label": "Cosmos Docs",
14 "href": "https://astronomer.github.io/astronomer-cosmos/",
15 "category": "Docs",
16 "name": "Docs",
17}
18
19# defining the plugin class
20class MyMenuItemsPlugin(AirflowPlugin):
21 name = "Menu items plugin"
22
23 # adding the menu items to the plugin
24 appbuilder_menu_items = [apache_mitem_subitem, cosmos_mitem_subitem]

The screenshots below show the additional menu items in the Airflow UI.

Apache Menu Sub-Item

Cosmos Menu Sub-Item

If you are not using Astro, you can create a top-level menu item with any name. For example, you can directly create a top-level menu item named “Apache” that links to the Apache homepage with the below code.

1from airflow.plugins_manager import AirflowPlugin
2
3# creating the Apache top-level menu item
4apache_mitem_toplevel = {
5 "name": "Apache",
6 "href": "https://www.apache.org/",
7}
8
9# defining the plugin class
10class MyMenuItemsPlugin(AirflowPlugin):
11 name = "Menu items plugin"
12
13 # adding the menu item to the plugin
14 appbuilder_menu_items = [apache_mitem_toplevel]

The screenshot below shows the top-level Apache menu item.

Apache Menu Top-Level Item

Flask Blueprints and Appbuilder views

Flask blueprints and views are plugin components that allow you to add more elaborate customization to the Airflow UI. A Flask blueprint functions as an organizational tool to group related views and supporting code while Flask views render webpages from html templates.

To learn more check out the Blueprints and Views tutorial in the official Flask documentation.

You can add a view to render a simple templated HTML file on top of the Airflow UI by following the steps below. The code also includes an additional example of creating an Appbuilder menu item (similar to the Cosmos Docs example) that can be accessed from the same top-level “Custom Menu” as the Appbuilder view.

  1. Create a folder within the plugins directory called templates.

  2. Within that folder create a HTML file called test.html and copy the following code into it:

    1{% extends "appbuilder/base.html" %}
    2{% block content %}
    3 <p>Airflow is {{ content }}!</p>
    4{% endblock %}
  3. In the plugins directory, add a Python file called my_first_view_plugin.py. Copy the code below. If you are using Astro, different code is required depending on whether your Airflow version is < 2.10 or >= 2.10.

Airflow Above 2 10
1from airflow.plugins_manager import AirflowPlugin
2from flask import Blueprint
3from flask_appbuilder import expose, BaseView as AppBuilderBaseView
4from airflow.configuration import conf
5
6# from airflow.www.auth import has_access # uncomment to use the plugin on Astro
7# from airflow.security import permissions # uncomment to use the plugin on Astro
8
9CUSTOM_ASTRO_RESOURCE = "Custom Menu"
10
11# define a Flask blueprint
12my_blueprint = Blueprint(
13 "test_plugin",
14 __name__,
15 # register airflow/plugins/templates as a Jinja template folder
16 template_folder="templates",
17)
18
19
20# create a flask appbuilder BaseView
21class MyBaseView(AppBuilderBaseView):
22 default_view = "test"
23
24 # If you are using Astro, uncomment the "has_access" line below, and do not change it
25 @expose("/")
26 # @has_access([(permissions.ACTION_CAN_ACCESS_MENU, CUSTOM_ASTRO_RESOURCE)])
27 def test(self):
28 # render the HTML file from the templates directory with content
29 return self.render_template("test.html", content="awesome")
30
31
32# instantiate MyBaseView
33my_view = MyBaseView()
34
35my_view_package = {
36 "category": CUSTOM_ASTRO_RESOURCE,
37 "name": CUSTOM_ASTRO_RESOURCE,
38 "view": my_view,
39 "label": "Custom Plugins",
40}
41
42# Appbuilder menu item without Flask view
43google_mitem = {
44 "category": CUSTOM_ASTRO_RESOURCE,
45 "label": "Google",
46 "href": "https://www.google.com",
47 "name": CUSTOM_ASTRO_RESOURCE,
48}
49
50
51# define the plugin class
52class MyViewPlugin(AirflowPlugin):
53 # name the plugin
54 name = "My appbuilder view"
55 # add the blueprint and appbuilder components
56 flask_blueprints = [my_blueprint]
57 appbuilder_views = [my_view_package]
58 appbuilder_menu_items = [google_mitem]
Airflow Below 2 10
1from airflow.plugins_manager import AirflowPlugin
2from flask import Blueprint
3from flask_appbuilder import expose, BaseView as AppBuilderBaseView
4from airflow.configuration import conf
5
6# from airflow.www.auth import has_access # uncomment to use the plugin on Astro
7# from airflow.security import permissions # uncomment to use the plugin on Astro
8
9CUSTOM_ASTRO_RESOURCE = "Custom Menu"
10
11# define a Flask blueprint
12my_blueprint = Blueprint(
13 "test_plugin",
14 __name__,
15 # register airflow/plugins/templates as a Jinja template folder
16 template_folder="templates",
17)
18
19
20# create a flask appbuilder BaseView
21class MyBaseView(AppBuilderBaseView):
22 default_view = "test"
23
24 # If you are using Astro, uncomment the "has_access" line below, and do not change it
25 @expose("/")
26 # @has_access([(permissions.ACTION_CAN_ACCESS_MENU, CUSTOM_ASTRO_RESOURCE)])
27 def test(self):
28 # render the HTML file from the templates directory with content
29 return self.render_template("test.html", content="awesome")
30
31
32# instantiate MyBaseView
33my_view = MyBaseView()
34
35# get the base URL of the Airflow webserver
36base_url = conf.get("webserver", "base_url")
37view_link = f"{str(base_url)}/{my_view.__class__.__name__.lower()}"
38
39# Note: The view below is without "name" due to a bug that doesn't propagate labels in Airflow < 2.10
40my_view_package = {
41 "category": CUSTOM_ASTRO_RESOURCE,
42 "view": my_view,
43}
44
45# "my_mitem" below uses the "view_link" to make it reachable via a clickable link in Airflow < 2.10
46my_mitem = {
47 "label": "Custom Plugins",
48 "href": view_link,
49 "name": CUSTOM_ASTRO_RESOURCE,
50 "category": CUSTOM_ASTRO_RESOURCE,
51}
52
53# Appbuilder menu item without Flask view
54google_mitem = {
55 "category": CUSTOM_ASTRO_RESOURCE,
56 "label": "Google",
57 "href": "https://www.google.com",
58 "name": CUSTOM_ASTRO_RESOURCE,
59}
60
61
62# define the plugin class
63class MyViewPlugin(AirflowPlugin):
64 # name the plugin
65 name = "My appbuilder view"
66 # add the blueprint and appbuilder_views components
67 flask_blueprints = [my_blueprint]
68 appbuilder_views = [my_view_package]
69 appbuilder_menu_items = [my_mitem, google_mitem]
  1. Start your Airflow instance using astro dev start or astro dev restart if you were already running Airflow.

This plugin will add a top-level menu item called Custom Menu which contains the sub-items Custom Plugins and Google.

Custom Menu

By clicking on Custom Plugins you can access the Flask View that was defined as my_view. It shows the HTML template (test.html) rendered with the provided content.

Custom Plugins View

Clicking on Google will take you to the url https://www.google.com similar to the Cosmos Docs example in the previous Appbuilder menu items section.

If you want to use custom menu items in an Airflow environment hosted on Astro, you must make sure to give your plugin the necessary permissions. To do this with Appbuilder views, use the @has_access decorator to give your BaseView method the ACTION_CAN_ACCESS_MENU permission on the “Custom Menu” resource (defined by CUSTOM_ASTRO_RESOURCE variable). The “Custom Menu” resource is registered in the Airflow Security Manager by Astronomer to allow users to create custom menu items.

1from airflow.www.auth import has_access
2from airflow.security import permissions
3
4# ...
5
6CUSTOM_ASTRO_RESOURCE = "Custom Menu"
7
8# ...
9
10# create a flask appbuilder BaseView
11class MyBaseView(AppBuilderBaseView):
12 default_view = "test"
13
14 # Do not change the "has_access" line below
15 @expose("/")
16 @has_access([(permissions.ACTION_CAN_ACCESS_MENU, CUSTOM_ASTRO_RESOURCE)])
17 def test(self):
18 # render the HTML file from the templates directory with content
19 return self.render_template("test.html", content="awesome")

Airflow environments hosted on Astro must have “Custom Menu” or one of the existing top-level menu items such as “Docs”, “Browse”, or “Astronomer” passed as the category and name value in their menu sub-items for both Appbuilder views and menu items. This is because the category and name values must match a permission in the Airflow Security Manager. With “Custom Menu” as the category and name values, the menu item will be mapped to the existing “Custom Menu” resource in the Airflow Security Manager. The Airflow Security Manager also contains resources for “Docs”, “Browse”, and “Astronomer”. Therefore, these values can also be passed to category and name. If a different value is passed to category and name, no users besides the Workspace Owner will be able to see the custom plugin since a corresponding resource would not exist in the Airflow Security Manager, and the appropriate permissions will not be mapped to the menu sub-items.

1CUSTOM_ASTRO_RESOURCE = "Custom Menu"
2
3# ...
4
5# Menu sub-item
6google_mitem = {
7 "category": CUSTOM_ASTRO_RESOURCE,
8 "label": "Google",
9 "href": "https://www.google.com",
10 "name": CUSTOM_ASTRO_RESOURCE,
11 }

The code to add the view is different for Airflow < 2.10 since there is a bug with the label field for views in those versions. As a workaround, we do not pass the name attribute to the view. Airflow checks for that name attribute to decide whether it should use appbuilder.add_view method or appbuilder.add_view_no_menu. Because we don’t pass name, it creates a view with “No menu”.

1my_view_package = {
2 "category": CUSTOM_ASTRO_RESOURCE,
3 "view": my_view,
4}

Now we create a “menu item” my_mitem with the label “Custom Plugins” and add an explicit link to the view created with “No menu”. To get the correct link, we use the following code:

1from airflow.configuration import conf
2
3 # ...
4
5base_url = conf.get("webserver", "base_url")
6view_link = f"{str(base_url)}/{my_view.__class__.__name__.lower()}"

And then pass it like this:

1 my_mitem = {
2 "label": "Custom Plugins",
3 "href": view_link,
4 "name": CUSTOM_ASTRO_RESOURCE,
5 "category": CUSTOM_ASTRO_RESOURCE,
6}

The bug is fixed in Airflow 2.10. Therefore, we can directly pass the name argument to my_view_package. Since name is used, Airflow will directly call appbuilder.add_view for my_view_package. Therefore, we do not have a separate my_mitem definition like we do in Airflow < 2.10.

Operator extra links are additional buttons with links that can be added to specific operators. They can be defined as Python classes derived from the BaseOperatorLink class. The example below shows how to create a new operator extra link MyLink that is applied to MyOperator1 and MyOperator2. The operator extra link is registered with the MyAirflowPlugin by adding it its operator_extra_links list.

1from airflow.models.baseoperator import BaseOperatorLink
2from include.custom_operators import MyOperator1, MyOperator2
3from airflow.plugins_manager import AirflowPlugin
4
5# create the operator extra link
6class MyLink(BaseOperatorLink):
7
8 # name the link button
9 name = "My extra link"
10
11 # add the link button to one or more operators
12 operators = [MyOperator1, MyOperator2]
13
14 # function determining the link
15 def get_link(self, operator, *, ti_key=None):
16 return "http://my_link.com/"
17
18# add the operator extra link to a plugin
19class MyOperatorExtraLink(AirflowPlugin):
20 name = "my_plugin_name"
21 operator_extra_links = [
22 MyLink(),
23 ]

The screenshot below shows an operator extra link called “HTTP cat” that was added to the custom CatHttpOperator. For more instructions, see this step-by-step tutorial on how to add operator extra links.

Cat Button

Global operator extra links are additional buttons with links that will be added to every operator. This example adds a button named Airflow docs to all operators that links out to the Airflow documentation.

1class GlobalLink(BaseOperatorLink):
2 # name the link button
3 name = "Airflow docs"
4
5 # function determining the link
6 def get_link(self, operator, *, ti_key=None):
7 return "https://airflow.apache.org/"
8
9# add the operator extra link to a plugin
10class MyGlobalLink(AirflowPlugin):
11 name = "my_plugin_name"
12 global_operator_extra_links = [
13 GlobalLink(),
14 ]

You can access the button on task instances in both the Graph and Grid views.

Airflow Docs Button

Macros

In Airflow you can define custom macros which can be accessed using Jinja templating. Macros can be added at the DAG level by defining them in the DAG parameter user_defined_macros as shown in Using Airflow templates. If you want to make macros available to your whole Airflow instance you can register them as a plugin.

Common use cases for custom macros include:

  • Injecting dynamic datetime objects into DAG code in formats not available in pre-defined macros. For example, converting the {{ ts }} predefined macro, which provides the logical date of the DAG as a timestamp from YYYY-MM-DDThh:mm:ss+00:00 to hh:mm.
  • Injecting dynamic arguments into DAG code based on Python logic. For example, passing a different argument to a function on weekdays versus the weekend.
  • Injecting dynamic arguments into DAG code based on XCom values. For example, using a different target blob storage depending on how many files will be ingested, a count determined and pushed to XCom by an upstream task.
1from airflow.plugins_manager import AirflowPlugin
2from random import randint
3
4def random_number_macro():
5 return randint(0, 1000)
6
7class MyAirflowMacro(AirflowPlugin):
8 name = "my_macro_plugin"
9 macros = [
10 random_number_macro,
11 ]

The code above creates a macro that returns a random number between 0 and 1000. It can be referenced using Jinja templating in any templateable parameter of any operator. The code snippet below shows how the macro can be used within the bash_command parameter of the BashOperator to print out a random number.

1use_plugin_macro = BashOperator(
2 task_id="use_plugin_macro",
3 bash_command="echo {{ macros.my_macro_plugin.random_number_macro() }}",
4)

Airflow listeners

Airflow listeners allow you to execute custom code when certain events occur anywhere in your Airflow instance, for example whenever any DAG run fails or an update to any dataset is detected.

Listeners execute based on the event they are waiting for and are not attached to a task or DAG. This is in contrast to Airflow callbacks which are attached to a specific DAG or (set of) task(s) or Airflow dataset consumer DAGs that only execute when a specific (set of) dataset(s) is updated. While the listener itself will execute any time its event occurs, you can use conditional logic to determine whether or not to execute the code within the listener. For example, you can use a listener to send a Slack notification when a DAG run fails, but only if the DAG run was not manually triggered.

You can create a listener using the @hookimpl decorator on functions defined with the same name and parameters as listed in the listeners spec source code.

For example, the @hookspec of the on_task_instance_failed function is:

1@hookspec
2def on_task_instance_failed(
3 previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None
4):
5 """Execute when task state changes to FAIL. previous_state can be None."""

In order to create a listener that executes whenever any task instance fails in your whole Airflow environment, you need to define a function called on_task_instance_failed that takes three parameters: previous_state, task_instance and session. Then, you decorate it with @hookimpl.

1from airflow.listeners import hookimpl
2
3@hookimpl
4def on_task_instance_failed(
5 previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None
6):
7 # Your code here

The listener file can then be registered as a plugin by adding it to the listeners component of the plugin class.

1from airflow.plugins_manager import AirflowPlugin
2from plugins import listener_code
3
4class MyListenerPlugin(AirflowPlugin):
5 name = "my_listener_plugin"
6 listeners = [listener_code]

To see a full example of how to create and register a listener, check out the Use a listener to send a Slack notification when a Dataset is updated tutorial.