Airflow operators
Operators are one of the building blocks of Airflow DAGs. There are many different types of operators available in Airflow. The PythonOperator
can execute any Python function, and is functionally equivalent to using the @task
decorator, while other operators contain pre-created logic to perform a specific task, such as executing a Bash script (BashOperator
) or running a SQL query in a relational database (SQLExecuteQueryOperator
). Operators are used alongside other building blocks, such as decorators and hooks, to create tasks in a DAG written with the task-oriented approach. Operators classes can be imported from Airflow provider packages.
In this guide, you'll learn the basics of using operators in Airflow.
To view a list of available operators available in different Airflow provider packages, go to the Astronomer Registry.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Basic Python. See the Python Documentation.
Operator basics
Operators are Python classes that encapsulate logic to do a unit of work. They can be viewed as a wrapper around each unit of work that defines the actions that will be completed and abstract the majority of code you would typically need to write. When you create an instance of an operator in a DAG and provide it with its required parameters, it becomes a task.
A base set of operators is contained in the Airflow standard provider package, which is pre-installed when using the Astro CLI. Other operators are contained in specialized provider packages, often centered around a specific technology or service. For example, the Airflow Snowflake Provider package contains operators for interacting with Snowflake, while the Airflow Google provider package contains operators for interacting with Google Cloud services. There are also several packages that contain operators that can be used with a set of services:
Operator examples
Following are some of the most frequently used Airflow operators. Note that only a few of the possible parameters are shown, refer to the Astronomer registry for a full list of parameters for each operator.
-
PythonOperator: Executes a Python function. It is functionally equivalent to using the
@task
decorator. See, Introduction to the TaskFlow API and Airflow decorators.from airflow.providers.standard.operators.python import PythonOperator
def _my_python_function():
print("Hello world!")
my_task = PythonOperator(
task_id="my_task",
python_callable=_my_python_function,
) -
BashOperator: Executes a bash script. See also the Using the BashOperator guide.
from airflow.providers.standard.operators.bash import BashOperator
my_task = BashOperator(
task_id="my_task",
bash_command="echo 'Hello world!'",
) -
KubernetesPodOperator: Executes a task defined as a Docker image in a Kubernetes Pod. See, Use the KubernetesPodOperator.
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
my_task = KubernetesPodOperator(
task_id="my_task",
kubernetes_conn_id="<my-kubernetes-connection>",
name="<my-pod-name>",
namespace="<my-namespace>",
image="python:3.12-slim", # Docker image to run
cmds=["python", "-c"], # Command to run in the container
arguments=["print('Hello world!')"], # Arguments to the command
) -
SQLExecuteQueryOperator: Executes a SQL query against a relational database.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
my_task = SQLExecuteQueryOperator(
task_id="my_task",
sql="SELECT * FROM my_table",
database="<my-database>",
conn_id="<my-connection>",
) -
EmptyOperator: A no-op operator that does nothing. This is useful for creating placeholder tasks in a DAG.
from airflow.providers.standard.operators.empty import EmptyOperator
my_task = EmptyOperator(task_id="my_task")
All operators inherit from the abstract BaseOperator class, which contains the logic to execute the work of the operator within the context of a DAG.
Arguments of the BaseOperator
class can be passed to all operators. The most common arguments are:
task_id
: A unique identifier for the task. This is required for all operators.retries
: The number of times to retry the task if it fails. This is optional and defaults to 0. See Rerun Airflow DAGs and tasks.pool
: The name of the pool to use for the task. This is optional and defaults to None. See Airflow pools.execution_timeout
: The maximum time to wait for the task to complete. This is optional and defaults to None. It is a good practice to set this value to prevent tasks from running indefinitely.
You can set these arguments and other BaseOperator
arguments (other than task_id
which needs to be unique per operator) at the DAG level for all tasks in a DAG. By using the default_args
dictionary. You can override these values for individual tasks by setting the same arguments in the task definition.
from airflow.sdk import chain, dag, task
from airflow.providers.standard.operators.bash import BashOperator
from pendulum import duration
@dag(
default_args={
"retries": 3,
"retry_delay": duration(minutes=5),
"execution_timeout": duration(minutes=30),
}, # default arguments for all tasks in the DAG
)
def my_dag23():
# this task is defined using the BashOperator
my_bash_task = BashOperator(
task_id="my_bash_task",
bash_command="echo 'Hello world!'",
retries=2, # Override the default retry value
retry_delay=duration(minutes=2), # Override the default retry delay
)
# this task is defined using the @task decorator which is functionally equivalent to the PythonOperator
@task(
retries=5, # Override the default retry value
priority_weight=10, # Give this task a higher priority
)
def my_decorated_task():
print("Hello world!")
# Set the task dependencies
chain(
my_bash_task,
my_decorated_task(),
)
my_dag23()
Best practices
Operators typically only require a few parameters. Keep the following considerations in mind when using Airflow operators:
- The Astronomer Registry is the best resource for learning what operators are available and how they are used.
- The Airflow standard provider package includes basic operators such as the
PythonOperator
andBashOperator
. These operators are automatically available in your Airflow environment if you are using the Astro CLI. All other operators are part of provider packages, some which you must install separately, depending on what type of Airflow distribution you are using. - You can combine operators and decorators freely in the same DAG. Many users choose to use the
@task
decorator for most of their tasks, and add operators for tasks where a specialized operator exists for their use case. The example above shows a DAG with one operator (BashOperator
) and one@task
decorated task. - If an operator exists for your specific use case, you should use it instead of your own Python functions or hooks. This makes your DAGs easier to read and maintain.
- If an operator doesn't exist for your use case, you can either use custom Python code in an
@task
decorated task orPythonOperator
or extend an operator to meet your needs. For more information about customizing operators, see Custom hooks and operators. - Sensors are a type of operator that waits for something to happen. They can be used to detect events in systems outside of Airflow.
- Deferrable Operators are a type of operator that releases their worker slot while waiting for their work to be completed. This can result in cost savings and greater scalability. Astronomer recommends using deferrable operators whenever one exists for your use case and your task takes longer than a minute. A lot of operators that potentially need to wait for something have a deferrable mode which you can enable by setting their
deferrable
parameter toTrue
. - Any operator that interacts with a service external to Airflow typically requires a connection so that Airflow can authenticate to that external system. For more information about setting up connections, see Managing your connections in Apache Airflow or in the examples to follow.