When orchestrating workflows in Apache Airflow, DAG authors often find themselves at a crossroad: choose the modern, Pythonic approach of the TaskFlow API or stick to the well-trodden path of traditional operators (e.g. BashOperator, SqlExecuteQueryOperator, etc.). Luckily, the TaskFlow API was implemented in such a way that allows TaskFlow tasks and traditional operators to coexist, offering users the flexibility to combine the best of both worlds.
Traditional operators are the building blocks that older Airflow versions employed, and while they are powerful and diverse, they can sometimes lead to boilerplate-heavy DAGs. For users that employ lots of Python functions in their DAGs, TaskFlow tasks represent a simpler way to transform functions into tasks, with a more intuitive way of passing data between tasks.
Both methodologies have their strengths, but many DAG authors mistakenly believe they must stick to one or the other. This belief can be limiting, especially when certain scenarios might benefit from a mix of both. Certain tasks might be more succinctly represented with traditional operators, while others might benefit from the brevity of the TaskFlow API. While the TaskFlow API simplifies data passing with direct function-to-function parameter passing, there are scenarios where the explicit nature of XComs in traditional operators can be advantageous for clarity and debugging.
In this blog post, I’ll show you how you can use both task definition methods together, along with some examples demonstrating how using both types together create unique synergies for more efficient DAGs!
Why Should You Combine the TaskFlow API with Traditional Operators in Airflow?
Integrating the TaskFlow API with traditional operators in Airflow offers DAG authors a potent combination of advantages that can’t be achieved using just one method in isolation.
Firstly, by using both, DAG creators gain access to the breadth of built-in functionality and fine-grained control that traditional operators offer, while also benefiting from the succinct, Pythonic syntax of the TaskFlow API. This dual approach enables more concise DAG definitions, minimizing boilerplate code while still allowing for complex orchestrations.
Moreover, combining methods facilitates smoother transitions in workflows that evolve over time. Teams can incrementally adopt the TaskFlow API in legacy DAGs written with traditional operators, ensuring that there’s no need for disruptive overhauls.
Additionally, blending the two means that DAGs can benefit from the dynamic task generation inherent in the TaskFlow API, alongside the explicit dependency management provided by traditional operators.
This results in DAGs that are not only more adaptable and scalable but also clearer in representing dependencies and data flow.
Passing Data Between Different Types of Tasks
For those familiar with traditional operators, the practice of using XCom’s push and pull mechanism is well-known for passing data between tasks. And if you use TaskFlow, you know you can inherently pass data just by setting your task dependencies.
def push_function(): xcom_push("key", "value_to_pass") def pull_function(): data = xcom_pull("key")
TaskFlow API Definition:
@task def push_task(): return "value_to_pass" @task def pull_task(data):
However, when people begin using both together, one common question that comes up is how to pass data between traditional operators and TaskFlow API tasks.
Transferring Data From TaskFlow API Tasks to Traditional Tasks
In this first example, we’ll pass data from a TaskFlow task to a traditional one by initially capturing the output of the TaskFlow task. Then, we’ll use it within the traditional task, using the
xcom_pull function to extract the XCom from the TaskFlow API.
The TaskFlow API is essentially a wrapper around the existing traditional XCom handling functionality that eliminates the boilerplate push/pull code that you previously needed. This means that you can pull from a TaskFlowAPI task using the same traditional syntax of
ti.xcom_pull, even though we didn’t explicitly push a value from the
push_data task. This is because the TaskFlow API took our return value and saved it as an Xcom, just as if we had used the
xcom_push function on our output.
One thing to note here is that because the TaskFlow API uses a lazy accessor, if the return value isn’t consumed anywhere, it won’t be saved as an XCom.
@dag(schedule_interval="@daily", start_date=datetime(2022, 1, 1)) def mixed_dag(): @task def push_data(): return "TaskFlow_data" TaskFlow_data = push_data() def pull_from_traditional(**kwargs): ti = kwargs["ti"] received_data = ti.xcom_pull(task_ids="push_data") print(received_data) traditional_task = PythonOperator( task_id="pull_from_traditional", python_callable=pull_from_traditional, provide_context=True, ) mixed_dag_instance = mixed_dag()
Transferring Data From Traditional Task to TaskFlow API Task
You can also push data from a traditional task and then consume it in a TaskFlow task. While you still have to use **kwargs as a parameter in the push task and either return a value or use
.xcom_push, you can access the value by calling ‘
taskname.output’ and saving the output. This allows you to then use the output of that traditional operator just as you would if the task was built with the TaskFlow API. You can see this in the example below, where
traditional_push_task.output is saved as data and then used as a parameter for the ‘
pull_data’ task to pass the value ‘
traditional_data’ into the ‘
@dag(schedule_interval="@daily", start_date=datetime(2022, 1, 1)) def another_mixed_dag(): def push_from_traditional(**kwargs): return "Traditional_data" traditional_push_task = PythonOperator( task_id="push_from_traditional", python_callable=push_from_traditional, provide_context=True, ) @task def pull_data(received_data): print(received_data) pull_data(traditional_push_task.output) another_mixed_dag_instance = another_mixed_dag()
Practical Examples of Combining Task Definitions
While it’s great that you can use both of these task definition approaches combined, I wouldn’t be writing this blog post if there weren’t some benefits other than just creating prettier DAGs! In this section, I’ll go over some tricks for combining both for different use cases!
Reference .output from traditional ops
Imagine you’re processing data using the SQLExecuteQueryOperator, and the next task is a data validation step using a TaskFlow task. With
.output, you can effortlessly pass the processed data from the traditional operator to the TaskFlow task for validation just by referencing ‘
sql_query = """ SELECT 'Processed Data' as data; """ process_task = SQLExecuteQueryOperator( task_id="process_task", sql=sql_query, conn_id="your_database_conn_id" ) @task def validate_data(data): print(data) validate_data(process_task.output)
Converting a Standard Python Function into a TaskFlow task
You may have some legacy code or shared utility functions in a library. Instead of rewriting them, you can convert these functions into TaskFlow tasks when needed and revert back to the normal function when not in a DAG context. This can be especially useful if you have a large set of Python scripts that you are using, allowing you to easily turn the scripts into TaskFlow tasks just by declaring it in the following syntax.
def my_function(data): return data * 2 task_version = task(my_function)("Sample Data")
This also allows you to easily declare the same task with different inputs, as one could swap out the “Sample Data” in the example above and easily create a parallel task instance with a different dataset.
Extracting the Task from a TaskFlow Task
If you want to set advanced configurations or leverage operator methods available to traditional tasks, extracting the underlying traditional task from a TaskFlow task is handy. When you define a task using the Taskflow API, it’s essentially a wrapper around a traditional task. This means that underneath the Taskflow decorator, there exists a traditional task object with all the familiar methods and attributes which you can access by calling ‘
In the below example, after the line
traditional_task_version = my_taskflow_task.task, the variable
traditional_task_version contains the traditional PythonOperator representation of the Taskflow task. You can then interact with this traditional task as you would in older versions of Airflow, accessing its methods, attributes, and configurations.
@task def my_taskflow_task(): return "Hello from Taskflow!" traditional_task_version = my_taskflow_task.task print( type(traditional_task_version) ) # This would print something like: <class 'airflow.operators.python.PythonOperator'> return my_taskflow_task()
Now that we’ve explored some of the different ways you can combine TaskFlow API and traditional operators to create more flexible DAGs, I want to show them in action in a real-world use case!
This example is a DAG similar to one our Customer Success Engineering Team created that is focused on processing reports in a Google Cloud Storage (GCS) bucket, enriching these objects with data fetched from Snowflake, and then triggering subsequent processing steps. By leveraging both TaskFlow API mechanisms and traditional operators, they were able to reap some unique benefits for their pipeline that they wouldn’t have if they had only stuck to one or the other. These include:
Dynamic Task Generation:
The process_org_files_and_upload function, decorated with the @task decorator, is a Python function turned into a task using the TaskflowAPI. This task processes files and prepares them dwfor upload. The output of process_org_files_and_upload_task is dynamically used to generate multiple instances of the LoadFileOperator through the .map() method. This allows for dynamic task generation based on the output of another task, which is a powerful capability introduced by the TaskflowAPI.
Flexibility and Integration:
The SQLExecuteQueryOperator is a regular operator. The fact that it can be seamlessly integrated and chained with the outputs of TaskflowAPI tasks (like process_org_files_and_upload_task) showcases the flexibility of combining both approaches.
This integration allows users to benefit from the dynamic nature of the TaskflowAPI, while also leveraging the out-of-the-box functionality provided by regular operators.
Parametrization and Templating:
The DAG accepts several parameters like gcs_input_file, report_date, and organization_name. These parameters are passed into the process_org_files_and_upload task using Jinja templating, showing how you can easily parameterize python functions for dynamic task generation through the TaskFlow API.
Dynamic Expansion of Operator Arguments:
The LoadFileOperator.partial() and MergeOperator.partial() methods, followed by .expand_kwargs(), indicate a dynamic expansion of arguments based on the outputs of preceding tasks. This kind of chaining and dynamic argument setting is an advanced use case that’s made more straightforward when combining the TaskflowAPI with regular operators.
Clean Code Structure:
The combination of TaskflowAPI and regular operators allows for a structured and readable DAG definition. Tasks are clearly defined and chained, and the logic is easy to follow. It’s a testament to how Airflow can be both dynamic and organized.
You can explore the code here.
In the dynamic world of Apache Airflow, there’s no one-size-fits-all approach to writing your DAGs. Whether you’re a fan of the TaskFlow API’s Pythonic elegance or appreciate the explicit nature of traditional operators, remember: you don’t have to choose. By understanding the strengths of both methodologies and integrating them where they shine brightest, you can craft DAGs that are efficient, readable, and tailored to your needs.
If you’re interested in learning more, or seeing additional examples, check out the Astronomer Learn page!