Airflow operators

Operators are one of the building blocks of Airflow DAGs. There are many different types of operators available in Airflow. The PythonOperator can execute any Python function, and is functionally equivalent to using the @task decorator, while other operators contain pre-created logic to perform a specific task, such as executing a Bash script (BashOperator) or running a SQL query in a relational database (SQLExecuteQueryOperator). Operators are used alongside other building blocks, such as decorators and hooks, to create tasks in a DAG written with the task-oriented approach. Operators classes can be imported from Airflow provider packages.

In this guide, you’ll learn the basics of using operators in Airflow.

To view a list of available operators available in different Airflow provider packages, go to the Astronomer Registry.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Operator basics

Operators are Python classes that encapsulate logic to do a unit of work. They can be viewed as a wrapper around each unit of work that defines the actions that will be completed and abstract the majority of code you would typically need to write. When you create an instance of an operator in a DAG and provide it with its required parameters, it becomes a task.

A base set of operators is contained in the Airflow standard provider package, which is pre-installed when using the Astro CLI. Other operators are contained in specialized provider packages, often centered around a specific technology or service. For example, the Airflow Snowflake Provider package contains operators for interacting with Snowflake, while the Airflow Google provider package contains operators for interacting with Google Cloud services. There are also several packages that contain operators that can be used with a set of services:

Operator examples

Following are some of the most frequently used Airflow operators. Note that only a few of the possible parameters are shown, refer to the Astronomer registry for a full list of parameters for each operator.

  • PythonOperator: Executes a Python function. It is functionally equivalent to using the @task decorator. See, Introduction to the TaskFlow API and Airflow decorators.

    1from airflow.providers.standard.operators.python import PythonOperator
    2
    3def _my_python_function():
    4 print("Hello world!")
    5
    6my_task = PythonOperator(
    7 task_id="my_task",
    8 python_callable=_my_python_function,
    9)
  • BashOperator: Executes a bash script. See also the Using the BashOperator guide.

    1from airflow.providers.standard.operators.bash import BashOperator
    2
    3my_task = BashOperator(
    4 task_id="my_task",
    5 bash_command="echo 'Hello world!'",
    6)
  • KubernetesPodOperator: Executes a task defined as a Docker image in a Kubernetes Pod. See, Use the KubernetesPodOperator.

    1from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
    2
    3my_task = KubernetesPodOperator(
    4 task_id="my_task",
    5 kubernetes_conn_id="<my-kubernetes-connection>",
    6 name="<my-pod-name>",
    7 namespace="<my-namespace>",
    8 image="python:3.12-slim", # Docker image to run
    9 cmds=["python", "-c"], # Command to run in the container
    10 arguments=["print('Hello world!')"], # Arguments to the command
    11)
  • SQLExecuteQueryOperator: Executes a SQL query against a relational database.

    1from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    2
    3my_task = SQLExecuteQueryOperator(
    4 task_id="my_task",
    5 sql="SELECT * FROM my_table",
    6 database="<my-database>",
    7 conn_id="<my-connection>",
    8)
  • EmptyOperator: A no-op operator that does nothing. This is useful for creating placeholder tasks in a DAG.

    1from airflow.providers.standard.operators.empty import EmptyOperator
    2
    3my_task = EmptyOperator(task_id="my_task")

All operators inherit from the abstract BaseOperator class, which contains the logic to execute the work of the operator within the context of a DAG.

Arguments of the BaseOperator class can be passed to all operators. The most common arguments are:

  • task_id: A unique identifier for the task. This is required for all operators.
  • retries: The number of times to retry the task if it fails. This is optional and defaults to 0. See Rerun Airflow DAGs and tasks.
  • pool: The name of the pool to use for the task. This is optional and defaults to None. See Airflow pools.
  • execution_timeout: The maximum time to wait for the task to complete. This is optional and defaults to None. It is a good practice to set this value to prevent tasks from running indefinitely.

You can set these arguments and other BaseOperator arguments (other than task_id which needs to be unique per operator) at the DAG level for all tasks in a DAG. By using the default_args dictionary. You can override these values for individual tasks by setting the same arguments in the task definition.

1import hashlib
2import json
3
4from airflow.exceptions import AirflowException
5from airflow.decorators import dag, task
6from airflow.models import Variable
7from airflow.models.baseoperator import chain
8from airflow.operators.empty import EmptyOperator
9from airflow.utils.dates import datetime
10from airflow.providers.amazon.aws.hooks.s3 import S3Hook
11from airflow.providers.amazon.aws.transfers.local_to_s3 import (
12 LocalFilesystemToS3Operator,
13)
14from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
15from airflow.providers.postgres.operators.postgres import PostgresOperator
16from airflow.operators.sql import SQLCheckOperator
17from airflow.utils.task_group import TaskGroup
18
19
20# The file(s) to upload shouldn't be hardcoded in a production setting,
21# this is just for demo purposes.
22CSV_FILE_NAME = "forestfires.csv"
23CSV_FILE_PATH = f"include/sample_data/forestfire_data/{CSV_FILE_NAME}"
24
25
26@dag(
27 "simple_redshift_3",
28 start_date=datetime(2021, 7, 7),
29 description="""A sample Airflow DAG to load data from csv files to S3
30 and then Redshift, with data integrity and quality checks.""",
31 schedule=None,
32 template_searchpath="/usr/local/airflow/include/sql/redshift_examples/",
33 catchup=False,
34)
35def simple_redshift_3():
36 """
37 Before running the DAG, set the following in an Airflow
38 or Environment Variable:
39 - key: aws_configs
40 - value: { "s3_bucket": [bucket_name], "s3_key_prefix": [key_prefix],
41 "redshift_table": [table_name]}
42 Fully replacing [bucket_name], [key_prefix], and [table_name].
43 """
44
45 upload_file = LocalFilesystemToS3Operator(
46 task_id="upload_to_s3",
47 filename=CSV_FILE_PATH,
48 dest_key="{{ var.json.aws_configs.s3_key_prefix }}/" + CSV_FILE_PATH,
49 dest_bucket="{{ var.json.aws_configs.s3_bucket }}",
50 aws_conn_id="aws_default",
51 replace=True,
52 )
53
54 @task
55 def validate_etag():
56 """
57 #### Validation task
58 Check the destination ETag against the local MD5 hash to ensure
59 the file was uploaded without errors.
60 """
61 s3 = S3Hook()
62 aws_configs = Variable.get("aws_configs", deserialize_json=True)
63 obj = s3.get_key(
64 key=f"{aws_configs.get('s3_key_prefix')}/{CSV_FILE_PATH}",
65 bucket_name=aws_configs.get("s3_bucket"),
66 )
67 obj_etag = obj.e_tag.strip('"')
68 # Change `CSV_FILE_PATH` to `CSV_CORRUPT_FILE_PATH` for the "sad path".
69 file_hash = hashlib.md5(open(CSV_FILE_PATH).read().encode("utf-8")).hexdigest()
70 if obj_etag != file_hash:
71 raise AirflowException(
72 """Upload Error: Object ETag in S3 did not match
73 hash of local file."""
74 )
75
76 # Tasks that were created using decorators have to be called to be used
77 validate_file = validate_etag()
78
79 # --- Create Redshift Table --- #
80 create_redshift_table = PostgresOperator(
81 task_id="create_table",
82 sql="create_redshift_forestfire_table.sql",
83 postgres_conn_id="redshift_default",
84 )
85
86 # --- Second load task --- #
87 load_to_redshift = S3ToRedshiftOperator(
88 task_id="load_to_redshift",
89 s3_bucket="{{ var.json.aws_configs.s3_bucket }}",
90 s3_key="{{ var.json.aws_configs.s3_key_prefix }}" + f"/{CSV_FILE_PATH}",
91 schema="PUBLIC",
92 table="{{ var.json.aws_configs.redshift_table }}",
93 copy_options=["csv"],
94 )
95
96 # --- Redshift row validation task --- #
97 validate_redshift = SQLCheckOperator(
98 task_id="validate_redshift",
99 conn_id="redshift_default",
100 sql="validate_redshift_forestfire_load.sql",
101 params={"filename": CSV_FILE_NAME},
102 )
103
104 # --- Row-level data quality check --- #
105 with open("include/validation/forestfire_validation.json") as ffv:
106 with TaskGroup(group_id="row_quality_checks") as quality_check_group:
107 ffv_json = json.load(ffv)
108 for id, values in ffv_json.items():
109 values["id"] = id
110 SQLCheckOperator(
111 task_id=f"forestfire_row_quality_check_{id}",
112 conn_id="redshift_default",
113 sql="row_quality_redshift_forestfire_check.sql",
114 params=values,
115 )
116
117 # --- Drop Redshift table --- #
118 drop_redshift_table = PostgresOperator(
119 task_id="drop_table",
120 sql="drop_redshift_forestfire_table.sql",
121 postgres_conn_id="redshift_default",
122 )
123
124 begin = EmptyOperator(task_id="begin")
125 end = EmptyOperator(task_id="end")
126
127 # --- Define task dependencies --- #
128 chain(
129 begin,
130 upload_file,
131 validate_file,
132 create_redshift_table,
133 load_to_redshift,
134 validate_redshift,
135 quality_check_group,
136 drop_redshift_table,
137 end,
138 )
139
140
141simple_redshift_3()

Best practices

Operators typically only require a few parameters. Keep the following considerations in mind when using Airflow operators:

  • The Astronomer Registry is the best resource for learning what operators are available and how they are used.
  • The Airflow standard provider package includes basic operators such as the PythonOperator and BashOperator. These operators are automatically available in your Airflow environment if you are using the Astro CLI. All other operators are part of provider packages, some which you must install separately, depending on what type of Airflow distribution you are using.
  • You can combine operators and decorators freely in the same DAG. Many users choose to use the @task decorator for most of their tasks, and add operators for tasks where a specialized operator exists for their use case. The example above shows a DAG with one operator (BashOperator) and one @task decorated task.
  • If an operator exists for your specific use case, you should use it instead of your own Python functions or hooks. This makes your DAGs easier to read and maintain.
  • If an operator doesn’t exist for your use case, you can either use custom Python code in an @task decorated task or PythonOperator or extend an operator to meet your needs. For more information about customizing operators, see Custom hooks and operators.
  • Sensors are a type of operator that waits for something to happen. They can be used to detect events in systems outside of Airflow.
  • Deferrable Operators are a type of operator that releases their worker slot while waiting for their work to be completed. This can result in cost savings and greater scalability. Astronomer recommends using deferrable operators whenever one exists for your use case and your task takes longer than a minute. A lot of operators that potentially need to wait for something have a deferrable mode which you can enable by setting their deferrable parameter to True.
  • Any operator that interacts with a service external to Airflow typically requires a connection so that Airflow can authenticate to that external system. For more information about setting up connections, see Managing your connections in Apache Airflow or in the examples to follow.