Custom hooks and operators

One of the great benefits of Airflow is its vast network of provider packages that provide hooks, operators, and sensors for many common use cases. Another great benefit of Airflow is that it is highly customizable because everything is defined in Python code. If a hook, operator, or sensor you need doesn’t exist in the open source, you can easily define your own.

In this guide, you’ll learn how to define your own custom Airflow operators and hooks to use in your DAGs. To explore existing hooks, operators, and sensors, visit the Astronomer Registry.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Create a custom operator

A custom operator is a Python class which can be imported into your DAG file. Like regular operators, instantiating a custom operator will create an Airflow task.

At a minimum, a custom operator must:

  • Inherit from the BaseOperator or any other existing operator.
  • Define an .__init__() method which runs when the DAG is parsed.
  • Define an .execute() method which runs when a task uses this operator.

Optionally you can:

  • Define a .pre_execute() method which runs before the .execute() method. This is particularly useful for adding functionality to an existing operator without the need to override the .execute() method.
  • Define a .post_execute() method which runs after the .execute() method. post_execute() is useful for logging or cleanup tasks that should run after the main task logic, or to push additional information to XCom. The return value of .execute() is passed to .post_execute() as the result argument.

The following is an example of a custom operator called MyOperator:

1# import the operator to inherit from
2from airflow.models.baseoperator import BaseOperator
3
4
5# define the class inheriting from an existing operator class
6class MyOperator(BaseOperator):
7 """
8 Simple example operator that logs one parameter and returns a string saying hi.
9 :param my_parameter: (required) parameter taking any input.
10 """
11
12 # define the .__init__() method that runs when the DAG is parsed
13 def __init__(self, my_parameter, *args, **kwargs):
14 # initialize the parent operator
15 super().__init__(*args, **kwargs)
16 # assign class variables
17 self.my_parameter = my_parameter
18
19 # define the .pre_execute() method that runs before the execute method (optional)
20 def pre_execute(self, context):
21 # write to Airflow task logs
22 self.log.info("Pre-execution step")
23
24 # define the .execute() method that runs when a task uses this operator.
25 # The Airflow context must always be passed to '.execute()', so make
26 # sure to include the 'context' kwarg.
27 def execute(self, context):
28 # write to Airflow task logs
29 self.log.info(self.my_parameter)
30 # the return value of '.execute()' will be pushed to XCom by default
31 return "hi :)"
32
33 # define the .post_execute() method that runs after the execute method (optional)
34 # result is the return value of the execute method
35 def post_execute(self, context, result=None):
36 # write to Airflow task logs
37 self.log.info("Post-execution step")

If your custom operator is modifying functionality of an existing operator, your class can inherit from the operator you are building on instead of the BaseOperator. For more detailed instructions see Creating a custom Operator.

:::info

It is possible to pass a callable to any operator’s pre_execute or post_execute parameter to inject custom logic into it without needing to define a custom operator. Note that this feature is considered experimental.

:::

Create a custom hook

A custom hook is a Python class which can be imported into your DAG file. Like regular hooks, custom hooks can be used to create connections to external tools from within your task code. Custom hooks often contain methods that interact with an external API, which makes them better to use in custom operators than direct API calls.

At a minimum, a custom hook must:

  • Inherit from the BaseHook or any other existing hook.
  • Define an .__init__() method which runs when the DAG is parsed.

Many hooks include a .get_conn() method wrapping around a call to the BaseHook method .get_connection() to retrieve information from an Airflow connection. It is common to call the .get_conn() method within the .__init__() method. The following is the minimum recommended code to start with for most custom hooks:

1# import the hook to inherit from
2from airflow.hooks.base import BaseHook
3
4
5# define the class inheriting from an existing hook class
6class MyHook(BaseHook):
7 """
8 Interact with <external tool>.
9 :param my_conn_id: ID of the connection to <external tool>
10 """
11
12 # provide the name of the parameter which receives the connection id
13 conn_name_attr = "my_conn_id"
14 # provide a default connection id
15 default_conn_name = "my_conn_default"
16 # provide the connection type
17 conn_type = "general"
18 # provide the name of the hook
19 hook_name = "MyHook"
20
21 # define the .__init__() method that runs when the DAG is parsed
22 def __init__(
23 self, my_conn_id: str = default_conn_name, *args, **kwargs
24 ) -> None:
25 # initialize the parent hook
26 super().__init__(*args, **kwargs)
27 # assign class variables
28 self.my_conn_id = my_conn_id
29 # (optional) call the '.get_conn()' method upon initialization
30 self.get_conn()
31
32 def get_conn(self):
33 """Function that initiates a new connection to your external tool."""
34 # retrieve the passed connection id
35 conn_id = getattr(self, self.conn_name_attr)
36 # get the connection object from the Airflow connection
37 conn = self.get_connection(conn_id)
38
39 return conn
40
41 # add additional methods to define interactions with your external tool

Import custom hooks and operators

After you’ve defined a custom hook or operator, you need to make it available to your DAGs. Some legacy Airflow documentation or forums may reference registering your custom operator as an Airflow plugin, but this is not necessary. To import a custom operator or hook to your DAGs, the operator or hook file needs to be in a directory that is present in your PYTHONPATH. See the Apache Airflow module management documentation for more info.

When using the Astro CLI you can add your custom operator file to the include directory of your Astro project. Consider adding sub-folders to make your include directory easier to navigate.

.
├── .astro/
├── dags/
│ └── example_dag.py
├── include/
│ └── custom_operators/
│ └── my_operator.py
│ └── custom_hooks/
│ └── my_hook.py
├── plugins/
├── tests/
├── .dockerignore
├── .env
├── .gitignore
├── .airflow_settings.yaml
├── Dockerfile
├── packages.txt
├── README.md
└── requirements.txt

For more details on why Astronomer recommends this project structure, see the Managing Airflow Code guide.

Using the project structure shown above, you can import the MyOperator class from the my_operator.py file and the MyHook class from the my_hook.py file in your DAGs with the following import statements:

from include.custom_operators.my_operator import MyOperator
from include.custom_hooks.my_hook import MyHook

Example implementation

The following code defines the MyBasicMathOperator class. This operator inherits from the BaseOperator and can perform arithmetic when you provide it two numbers and an operation. This code is saved in the include folder in a file called basic_math_operator.py.

1from airflow.models.baseoperator import BaseOperator
2
3
4class MyBasicMathOperator(BaseOperator):
5 """
6 Example Operator that does basic arithmetic.
7 :param first_number: first number to put into an equation
8 :param second_number: second number to put into an equation
9 :param operation: mathematical operation to perform
10 """
11
12 # provide a list of valid operations
13 valid_operations = ("+", "-", "*", "/")
14 # define which fields can use Jinja templating
15 template_fields = ("first_number", "second_number")
16
17 def __init__(
18 self,
19 first_number: float,
20 second_number: float,
21 operation: str,
22 *args,
23 **kwargs,
24 ):
25 super().__init__(*args, **kwargs)
26 self.first_number = first_number
27 self.second_number = second_number
28 self.operation = operation
29
30 # raise an import error if the operation provided is not valid
31 if self.operation not in self.valid_operations:
32 raise ValueError(
33 f"{self.operation} is not a valid operation. Choose one of {self.valid_operations}"
34 )
35
36 def execute(self, context):
37 self.log.info(
38 f"Equation: {self.first_number} {self.operation} {self.second_number}"
39 )
40 if self.operation == "+":
41 res = self.first_number + self.second_number
42 self.log.info(f"Result: {res}")
43 return res
44 if self.operation == "-":
45 res = self.first_number - self.second_number
46 self.log.info(f"Result: {res}")
47 return res
48 if self.operation == "*":
49 res = self.first_number * self.second_number
50 self.log.info(f"Result: {res}")
51 return res
52 if self.operation == "/":
53 try:
54 res = self.first_number / self.second_number
55 except ZeroDivisionError as err:
56 self.log.critical(
57 "If you have set up an equation where you are trying to divide by zero, you have done something WRONG. - Randall Munroe, 2006"
58 )
59 raise ZeroDivisionError
60
61 self.log.info(f"Result: {res}")
62 return res

In addition to the custom operator, the example DAG uses a custom hook to connect to the CatFactAPI. This hook abstracts retrieving the API URL from an Airflow connection and makes several calls to the API in a loop. This code should also be placed in the include directory in a file called cat_fact_hook.py.

1"""This module allows you to connect to the CatFactAPI."""
2
3from airflow.hooks.base import BaseHook
4class CatFactHook(BaseHook):
5 """
6 Interact with the CatFactAPI.
7
8 Performs a connection to the CatFactAPI and retrieves a cat fact client.
9
10 :cat_fact_conn_id: Connection ID to retrieve the CatFactAPI url.
11 """
12
13 conn_name_attr = "cat_conn_id"
14 default_conn_name = "cat_conn_default"
15 conn_type = "http"
16 hook_name = "CatFact"
17
18 def __init__(
19 self, cat_fact_conn_id: str = default_conn_name, *args, **kwargs
20 ) -> None:
21 super().__init__(*args, **kwargs)
22 self.cat_fact_conn_id = cat_fact_conn_id
23 self.get_conn()
24
25 def get_conn(self):
26 """Function that initiates a new connection to the CatFactAPI."""
27
28 # get the connection object from the Airflow connection
29 conn = self.get_connection(self.cat_fact_conn_id)
30
31 # return the host URL
32 return conn.host
33
34 def log_cat_facts(self, number_of_cat_facts_needed: int = 1):
35 """Function that logs between 1 to 10 catfacts depending on its input."""
36 if number_of_cat_facts_needed < 1:
37 self.log.info(
38 "You will need at least one catfact! Setting request number to 1."
39 )
40 number_of_cat_facts_needed = 1
41 if number_of_cat_facts_needed > 10:
42 self.log.info(
43 f"{number_of_cat_facts_needed} are a bit many. Setting request number to 10."
44 )
45 number_of_cat_facts_needed = 10
46
47 cat_fact_connection = self.get_conn()
48
49 # log several cat facts using the connection retrieved
50 for i in range(number_of_cat_facts_needed):
51 cat_fact = re.get(cat_fact_connection).json()
52 self.log.info(cat_fact["fact"])
53 return f"{i} catfacts written to the logs!"

To use this custom hook, you need to create an Airflow connection with the connection ID cat_fact_conn, the connection type HTTP, and the Host http://catfact.ninja/fact.

Cat fact connection

You can then import the custom operator and custom hook into your DAG. Because the custom operator has defined first_value and second_value as template_fields, you can pass values from other tasks to these parameters using Jinja templating.

Something went wrong!
Something went wrong!