Use the @task.kubernetes decorator

The @task.kubernetes decorator provides a TaskFlow alternative to the traditional KubernetesPodOperator, which allows you to run a specified task in its own Kubernetes pod. Note that the Docker image provided to the @task.kubernetes decorator’s image parameter must support executing Python scripts in order to leverage the KubernetesPodOperator decorator.

Like regular @task decorated functions, XComs can be passed to the Python script running in the dedicated Kubernetes pod. If do_xcom_push is set to True in the decorator parameters, the value returned by the decorated function is pushed to XCom.

Astronomer recommends using the @task.kubernetes decorator instead of the KubernetesPodOperator when using XCom with Python scripts in a dedicated Kubernetes pod.

1from pendulum import datetime
2from airflow.configuration import conf
3from airflow.decorators import dag, task
4import random
5
6# get the current Kubernetes namespace Airflow is running in
7namespace = conf.get("kubernetes", "NAMESPACE")
8
9@dag(
10 start_date=datetime(2023, 1, 1),
11 catchup=False,
12 schedule="@daily",
13)
14def kubernetes_decorator_example_dag():
15 @task
16 def extract_data():
17 # simulating querying from a database
18 data_point = random.randint(0, 100)
19 return data_point
20
21 @task.kubernetes(
22 # specify the Docker image to launch, it needs to be able to run a Python script
23 image="python",
24 # launch the Pod on the same cluster as Airflow is running on
25 in_cluster=True,
26 # launch the Pod in the same namespace as Airflow is running in
27 namespace=namespace,
28 # Pod configuration
29 # naming the Pod
30 name="my_pod",
31 # log stdout of the container as task logs
32 get_logs=True,
33 # log events in case of Pod failure
34 log_events_on_failure=True,
35 # enable pushing to XCom
36 do_xcom_push=True,
37 )
38 def transform(data_point):
39 multiplied_data_point = 23 * int(data_point)
40 return multiplied_data_point
41
42 @task
43 def load_data(**context):
44 # pull the XCom value that has been pushed by the KubernetesPodOperator
45 transformed_data_point = context["ti"].xcom_pull(
46 task_ids="transform", key="return_value"
47 )
48 print(transformed_data_point)
49
50 load_data(transform(extract_data()))
51
52
53kubernetes_decorator_example_dag()