The TaskFlow API is a functional API for using decorators to define DAGs and tasks, which simplifies the process for passing data between tasks and defining dependencies. You can use TaskFlow decorator functions (for example, @task) to pass data between tasks by providing the output of one task as an argument to another task. Decorators are a simpler, cleaner way to define your tasks and DAGs and can be used in combination with traditional operators.
In this guide, you’ll learn about the benefits of decorators and the decorators available in Airflow. You’ll also review an example DAG and learn when you should use decorators and how you can combine them with traditional operators in a DAG.
There are multiple resources for learning about this topic. See also:
To get the most out of this guide, you should have an understanding of:
In Python, decorators are functions that take another function as an argument and extend the behavior of that function. For example, the @multiply_by_100_decorator takes any function as the decorated_function argument and returns the result of that function multiplied by 100.
In the context of Airflow, decorators contain more functionality than this simple example, but the basic idea is the same: the Airflow decorator function extends the behavior of a normal Python function to turn it into an Airflow task, task group or DAG.
The purpose of the TaskFlow API in Airflow is to simplify the DAG authoring experience by eliminating the boilerplate code required by traditional operators. The result can be cleaner DAG files that are more concise and easier to read.
In general, whether you use the TaskFlow API is a matter of your own preference and style. In most cases, a TaskFlow decorator and the corresponding traditional operator will have the same functionality. You can also mix decorators and traditional operators within a single DAG.
The TaskFlow API allows you to write your Python tasks with decorators. It handles passing data between tasks using XCom and infers task dependencies automatically.
Using decorators to define your Python functions as tasks is easy. Let’s take a before and after example. Under the Traditional syntax tab below, there is a basic ETL DAG with tasks to get data from an API, process the data, and store it. Click on the Decorators tab to see the same DAG written using Airflow decorators.
The decorated version of the DAG eliminates the need to explicitly instantiate the PythonOperator, has much less code and is easier to read. Notice that it also doesn’t require using ti.xcom_pull and ti.xcom_push to pass data between tasks. This is all handled by the TaskFlow API when you define your task dependencies with store_data(process_data(extract_bitcoin_price())).
Here are some other things to keep in mind when using decorators:
You must call all decorated functions in your DAG file so that Airflow can register the task or DAG. For example, taskflow() is called at the end of the previous example to call the DAG function.
When you define a task, the task_id defaults to the name of the function you decorated. If you want to change this behavior, you can pass a task_id to the decorator as shown in the extract task example. Similarly, other BaseOperator task-level parameters, such as retries or pool, can be defined within the decorator:
Override task-level parameters when you call the task by using the .override() method. The () at the end of the line calls the task with the overridden parameters applied.
If you call the same task multiple times and do not override the task_id, Airflow creates multiple unique task IDs by appending a number to the end of the original task ID (for example, say_hello, say_hello__1, say_hello__2, etc). You can see the result of this in the following example:
You can decorate a function that is imported from another file as shown in the following code snippet:
This is recommended in cases where you have lengthy Python functions since it will make your DAG file easier to read.
You can assign the output of a called decorated task to a Python object to be passed as an argument into another decorated task. This is helpful when the output of one decorated task is needed in several downstream functions.
View more examples on how to use Airflow task decorators in the Astronomer webinars and the Apache Airflow TaskFlow API tutorial.
If you have a DAG that uses PythonOperator and other operators that don’t have decorators, you can easily combine decorated functions and traditional operators in the same DAG. For example, you can add an EmailOperator to the previous example by updating your code to the following:
Note that when adding traditional operators, dependencies are still defined using bit-shift operators.
You can pass information between decorated tasks and traditional operators using XCom. See the following sections for examples.
If both tasks are defined using the TaskFlow API, you can pass information directly between them by providing the called task function as a positional argument to the downstream task. Airflow will infer the dependency between the two tasks.
Pass the .output values of traditional Operator-based tasks to the callable of TaskFlow tasks to automatically create a relationship between the two tasks. For example:
When no outputs are passed as arguments to automatically register dependencies, but you still want one task to complete before the second starts, use >> to create the relationship between the first task and the second task:
The data a task returns varies based on its operator. A searchable registry of operators at the Airflow Registry describes what operators return and documents any operator parameters that can be used to control the return format.
TaskFlow tasks, when called, return a reference (referred to internally as an XComArg) that can be passed into templateable fields of traditional operators to automatically create a relationship between the two tasks.
The list of templateable fields varies by operator. A searchable registry of operators at the Airflow Registry details which fields are templateable by default and users can control which fields are templateable.
Here’s how you could pass the result of a TaskFlow function to a traditional PythonOperator’s callable via an argument:
When only the order of task execution is important, don’t pass the return value of the first task as a parameter to the second task - instead use >> to explicitly create the relationship. For example:
For the sake of completeness the below example shows how to use the output of one traditional operator in another traditional operator by accessing the .output attribute of the upstream task. The dependency has to be defined explicitly using bit-shift operators.
If you want to access any XCom that is not the returned value from an operator, you can use the xcom_pull method inside a function, see how to access ti / task_instance in the Airflow context for an example. Traditional operators can also pull from XCom using Jinja templates in templateable parameters.
There are several decorators available to use with Airflow. This list provides a reference of currently available decorators:
@dag()), which creates a DAG.@task_group()), which creates a TaskGroup.@task()), which creates a Python task.@task.bash()) which creates a BashOperator task.@task.virtualenv()), which runs your Python task in a virtual environment.@task.docker()), which creates a DockerOperator task.@task.short_circuit()), which evaluates a condition and skips downstream tasks if the condition is False.@task.branch()), which creates a branch in your DAG based on an evaluated condition.@task.branch_external_python), which creates a branch in your DAG running Python code in a pre-existing virtual environment.@task.branch_virtualenv), which creates a branch in your DAG running Python code in a newly created virtual environment. The environment can be cached by providing a venv_cache_path.@task.kubernetes()), which runs a KubernetesPodOperator task.@task.sensor()), which turns a Python function into a sensor.@task.pyspark()), which is injected with a SparkSession and SparkContext object if available.You can also create your own custom task decorator.