Use the KubernetesPodOperator

The KubernetesPodOperator (KPO) runs a Docker image in a dedicated Kubernetes Pod. By abstracting calls to the Kubernetes API, the KubernetesPodOperator lets you start and run Pods from Airflow using DAG code.

In this guide, you’ll learn:

  • The requirements for running the KubernetesPodOperator.
  • When to use the KubernetesPodOperator.
  • How to configure the KubernetesPodOperator.
  • The differences between the KubernetesPodOperator and the Kubernetes executor.

You’ll also learn how to use the KubernetesPodOperator to run a task in a language other than Python, how to use the KubernetesPodOperator with XComs, and how to launch a Pod in a remote AWS EKS Cluster.

On Astro, all of the infrastructure required to run the KubernetesPodOperator is hosted by Astronomer and managed automatically. Therefore, some of the use cases on this page might be simplified if you’re running the KubernetesPodOperator on Astro. See Run the KubernetesPodOperator on Astro to learn more.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Prerequisites

To use the KubernetesPodOperator you need to install the Kubernetes provider package. To install it with pip, run:

$pip install apache-airflow-providers-cncf-kubernetes==<version>

If you use the Astro CLI, you can alternatively install the package by adding the following line to your Astro project:

apache-airflow-providers-cncf-kubernetes==<version>

Review the Airflow Kubernetes provider Documentation to make sure you install the correct version of the provider package for your version of Airflow.

You also need an existing Kubernetes cluster to connect to. This is commonly the same cluster that Airflow is running on, but it doesn’t have to be.

You don’t need to use the Kubernetes executor to use the KubernetesPodOperator. You can choose one of the following executors:

  • Local executor
  • LocalKubernetes executor
  • Celery executor
  • Kubernetes executor
  • CeleryKubernetes executor

On Astro, the infrastructure needed to run the KubernetesPodOperator with the Celery executor is included with all clusters by default. For more information, see Run the KubernetesPodOperator on Astro.

Run the KubernetesPodOperator locally

Setting up your local environment to use the KubernetesPodOperator can help you avoid time consuming deployments to remote environments.

Use the steps below to quickly set up a local environment for the KubernetesPodOperator using the Astro CLI. Alternatively, you can use the Helm Chart for Apache Airflow to run open source Airflow within a local Kubernetes cluster. See Getting Started With the Official Airflow Helm Chart.

Step 1: Set up Kubernetes

Windows And Mac

The latest versions of Docker for Windows and Mac let you run a single node Kubernetes cluster locally. If you are using Windows, see Setting Up Docker for Windows and WSL to Work Flawlessly. If you are using Mac, see Docker Desktop for Mac user manual. It isn’t necessary to install Docker Compose.

  1. Open Docker and go to Settings > Kubernetes.

  2. Select the Enable Kubernetes checkbox.

  3. Click Apply and Restart.

  4. Click Install in the Kubernetes Cluster Installation dialog.

    Docker restarts and the status indicator changes to green to indicate Kubernetes is running.

Linux
  1. Install Microk8s. See Microk8s.

  2. Run microk8s.start to start Kubernetes.

Step 2: Update the kubeconfig file

Windows And Mac
  1. Use the following commands to copy the docker-desktop context from the Kubernetes configuration file and save it as a separate file in the /include/.kube/ folder in your Astro project. The config file contains all the information the KubernetesPodOperator uses to connect to your cluster.

    $kubectl config use-context docker-desktop
    >kubectl config view --minify --raw > <Astro project directory>/include/.kube

    After running these commands, you will find a config file in the /include/.kube/ folder of your Astro project which resembles this example:

    1apiVersion: v1
    2clusters:
    3- cluster:
    4 certificate-authority-data: <certificate-authority-data>
    5 server: https://kubernetes.docker.internal:6443/
    6 name: docker-desktop
    7contexts:
    8- context:
    9 cluster: docker-desktop
    10 user: docker-desktop
    11 name: docker-desktop
    12current-context: docker-desktop
    13kind: Config
    14preferences: {}
    15users:
    16- name: docker-desktop
    17 user:
    18 client-certificate-data: <client-certificate-data>
    19 client-key-data: <client-key-data>
  2. If you have issues connecting, check the server configuration in the kubeconfig file. If server: https://localhost:6445 is present, change to server: https://kubernetes.docker.internal:6443 to identify the localhost running Kubernetes Pods. If this doesn’t work, try server: https://host.docker.internal:6445.

  3. (Optional) Add the .kube folder to .gitignore if your Astro project is hosted in a GitHub repository and you want to prevent the file from being tracked by your version control tool.

  4. (Optional) Add the .kube folder to .dockerignore to exclude it from the Docker image.

Linux

In a .kube folder in your Astro project, create a config file with:

$microk8s.config > /include/.kube/config

Step 3: Create Kubernetes Connection in the Airflow UI

To run a Kubernetes pod locally, you can use the following .json template to create a .json connection string that you can then use to create a Kubernetes connection via the local Airflow UI. First, edit the template with the values you gathered in the previous step:

1{
2 "apiVersion": "v1",
3 "clusters": [
4 {
5 "cluster": {
6 "certificate-authority-data": "<certificate-authority-data>",
7 "server": "https://kubernetes.docker.internal:6443"
8 },
9 "name": "docker-desktop"
10 }
11 ],
12 "contexts": [
13 {
14 "context": {
15 "cluster": "docker-desktop",
16 "user": "docker-desktop"
17 },
18 "name": "docker-desktop"
19 }
20 ],
21 "current-context": "docker-desktop",
22 "kind": "Config",
23 "preferences": {},
24 "users": [
25 {
26 "name": "docker-desktop",
27 "user": {
28 "client-certificate-data": "<client-certificate-data>",
29 "client-key-data": "<client-key-data>"
30 }
31 }
32 ]
33}

Then, run astro dev start with the Astro CLI to spin up a local Airflow environment. Once your environment has been created, open up the connection management UI, and create a new connection of the Kubernetes Cluster Connection type. Within the connection creation menu, copy the .json file you created using the above template into the Kube config (JSON format) field, and save the connection with the connection id k8s_conn. If you’d like to use another connection id, make sure to alter the following example DAG code.

Step 4: Run your container

To use the KubernetesPodOperator, you must define the configuration of each task and the Kubernetes Pod in which it runs, including its namespace and Docker image.

This example DAG runs a hello-world Docker image using the k8s_conn connection you defined in the previous step to run it on your local Kubernetes cluster.

1from pendulum import datetime
2from airflow import DAG
3from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
4 KubernetesPodOperator,
5)
6
7with DAG(
8 dag_id="example_kubernetes_pod",
9 schedule="@once",
10 start_date=datetime(2023, 3, 30),
11) as dag:
12 example_kpo = KubernetesPodOperator(
13 kubernetes_conn_id="k8s_conn",
14 image="hello-world",
15 name="airflow-test-pod",
16 task_id="task-one",
17 is_delete_operator_pod=True,
18 get_logs=True,
19 )
20
21 example_kpo

Step 4: View Kubernetes logs

(Optional) Use the kubectl command line tool to review the logs for any Pods that were created by the operator for issues and help with troubleshooting. If you haven’t installed the kubectl command line tool, see Install Tools.

Windows And Mac

Run kubectl get pods -n $namespace or kubectl logs {pod_name} -n $namespace to examine the logs for the Pod that just ran. By default, docker-for-desktop runs Pods in the default namespace.

Linux

Run microk8s.kubectl get pods -n $namespace or microk8s.kubectl logs {pod_name} -n $namespace to examine the logs for the pod that just ran. By default, microk8s runs pods in the default namespace.

When to use the KubernetesPodOperator

The KubernetesPodOperator runs any Docker image provided to it. Frequent use cases are:

  • Running a task in a language other than Python. This guide includes an example of how to run a Haskell script with the KubernetesPodOperator.
  • Having full control over how much compute resources and memory a single task can use.
  • Executing tasks in a separate environment with individual packages and dependencies.
  • Running tasks that use a version of Python not supported by your Airflow environment.
  • Running tasks with specific Node (a virtual or physical machine in Kubernetes) constraints, such as only running on Nodes located in the European Union.

A comparison of the KubernetesPodOperator and the Kubernetes executor

Executors determine how your Airflow tasks are executed. The Kubernetes executor and the KubernetesPodOperator both dynamically launch and terminate Pods to run Airflow tasks. As the name suggests, the Kubernetes executor affects how all tasks in an Airflow instance are executed. The KubernetesPodOperator launches only its own task in a Kubernetes Pod with its own configuration. It does not affect any other tasks in the Airflow instance. To configure the Kubernetes executor, see Kubernetes Executor.

The following are the primary differences between the KubernetesPodOperator and the Kubernetes executor:

  • The KubernetesPodOperator requires a Docker image to be specified, while the Kubernetes executor doesn’t.
  • The KubernetesPodOperator defines one isolated Airflow task. In contrast, the Kubernetes executor is implemented at the configuration level of the Airflow instance, which means all tasks run in their own Kubernetes Pod. This might be desired in some use cases that require auto-scaling, but it’s not ideal for environments with a high volume of shorter running tasks.
  • In comparison to the KubernetesPodOperator, the Kubernetes executor has less abstraction over Pod configuration. All task-level configurations have to be passed to the executor as a dictionary using the BaseOperator's executor_config argument, which is available to all operators.
  • If a custom Docker image is passed to the Kubernetes executor’s base container by providing it to either the pod_template_file or the pod_override key in the dictionary for the executor_config argument, Airflow must be installed or the task will not run. A possible reason for customizing this Docker image would be to run a task in an environment with different versions of packages than other tasks running in your Airflow instance. This is not the case with the KubernetesPodOperator, which can run any valid Docker image.

Both the KubernetesPodOperator and the Kubernetes executor can use the Kubernetes API to create Pods for running tasks. Typically, the KubernetesPodOperator is ideal for controlling the environment in which the task runs, while the Kubernetes executor is ideal for controlling resource optimization. It’s common to use both the Kubernetes executor and the KubernetesPodOperator in the same Airflow environment, where all tasks need to run on Kubernetes but only some tasks require additional environment configurations.

How to configure the KubernetesPodOperator

The KubernetesPodOperator launches any valid Docker image provided to it in a dedicated Kubernetes Pod on a Kubernetes cluster. The KubernetesPodOperator supports arguments for some of the most common Pod settings. For advanced use cases, you can specify a Pod template file that supports all possible Pod settings.

The KubernetesPodOperator can be instantiated like any other operator within the context of a DAG.

Required arguments

  • task_id: A unique string identifying the task within Airflow.
  • namespace: The namespace within your Kubernetes cluster to which the new Pod is assigned.
  • name: The name of the Pod being created. This name must be unique for each Pod within a namespace.
  • image: The Docker image to launch. Images from hub.docker.com can be passed with just the image name, but you must provide the full URL for custom repositories.

Optional arguments

  • random_name_suffix: Generates a random suffix for the Pod name if set to True. Avoids naming conflicts when running a large number of Pods.

  • labels: A list of key and value pairs which can be used to logically group decoupled objects together.

  • ports: Ports for the Pod.

  • reattach_on_restart: Defines how to handle losing the worker while the Pod is running. When set to True, the existing Pod reattaches to the worker on the next try. When set to False, a new Pod will be created for each try. The default is True.

  • is_delete_operator_pod: Determines whether to delete the Pod when it reaches its final state or when the execution is interrupted. The default is True.

  • get_logs: Determines whether to use the stdout of the container as task-logs to the Airflow logging system.

  • log_events_on_failure: Determines whether events are logged in case the Pod fails. The default is False.

  • env_vars: A dictionary of environment variables for the Pod.

  • container_resources: A k8s.V1ResourceRequirements object containing the resource requests and/or limits for the Pod.

    1# from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    2# KubernetesPodOperator,
    3# )
    4# from kubernetes.client import CoreV1Api, V1Pod, models as k8s
    5
    6KubernetesPodOperator(
    7 # other arguments
    8 container_resources=k8s.V1ResourceRequirements(
    9 requests={"cpu": "100m", "memory": "64Mi", "ephemeral-storage": "1Gi"},
    10 limits={"cpu": "200m", "memory": "420Mi", "ephemeral-storage": "2Gi"},
    11 )
    12)

    See the Kubernetes Documentation on Resource Management for Pods and Containers for more information.

Astronomer customers can set default resource requests and limits for all KPO tasks in their deployment settings, see Configure Kubernetes Pod resources. Setting the container_resources argument in the KPO task will override the default settings. Note that using ephemeral-storage for Astro Hosted is currently in Public Preview.

  • volumes: A list of k8s.V1Volumes, see also this Kubernetes example DAG.
  • affinity and tolerations: Dictionaries of rules for Pod to Node assignments. Like the volumes parameter, these also require a k8s object.
  • pod_template_file: The path to a Pod template file.
  • full_pod_spec: A complete Pod configuration formatted as a Python k8s object.

You can also use many other arguments to configure the Pod and pass information to the Docker image. For a list of the available KubernetesPodOperator arguments, see the KubernetesPodOperator source code.

The following KubernetesPodOperator arguments can be used with Jinja templates: image, cmds, arguments, env_vars, labels, config_file, pod_template_file, and namespace.

Configure a Kubernetes connection

If you leave in_cluster=True, you only need to specify the KubernetesPodOperator’s namespace argument to establish a connection with your Kubernetes cluster. The Pod specified by the KubernetesPodOperator runs on the same Kubernetes cluster as your Airflow instance.

If you are not running Airflow on Kubernetes, or want to send the Pod to a different cluster than the one currently hosting your Airflow instance, you can create a Kubernetes Cluster connection which uses the Kubernetes hook to connect to the Kubernetes API of a different Kubernetes cluster. This connection can be passed to the KubernetesPodOperator using the kubernetes_conn_id argument and requires the following components to work:

  • A KubeConfig file, provided as either a path to the file or in JSON format.
  • The cluster context from the provided KubeConfig file.

The following image shows how to set up a Kubernetes cluster connection in the Airflow UI.

Kubernetes Cluster Connection

The components of the connection can also be set or overwritten at the task level by using the arguments config_file (to specify the path to the KubeConfig file) and cluster_context. Setting these parameters in airflow.cfg has been deprecated.

Launching Pods in external clusters

If some of your tasks require specific resources such as a GPU, you might want to run them in a different cluster than your Airflow instance.

The way that you connect to an external cluster will vary based on where your cluster is hosted and where your Airflow environment is hosted, but generally the following conditions must be met to launch a Pod in an external cluster:

  • Your Airflow environment must have a network connection to the external cluster
  • Your Airflow environment must have permissions to spin up Pods in the external cluster
  • Your cluster configuration must be passed to your KubernetesPodOperator tasks either through a task-level configuration or a Kubernetes connection

See the Astro documentation for a more detailed example of how to configure a KubernetesPodOperator task to launch a Pod in an external EKS cluster.

Use the @task.kubernetes decorator

The @task.kubernetes decorator provides an alternative to the traditional KubernetesPodOperator when you run Python scripts in a separate Kubernetes Pod. The Docker image provided to the @task.kubernetes decorator must support executing Python scripts.

Like regular @task decorated functions, XComs can be passed to the Python script running in the dedicated Kubernetes pod. If do_xcom_push is set to True in the decorator parameters, the value returned by the decorated function is pushed to XCom. You can learn more about decorators in the Introduction to Airflow decorators guide.

Astronomer recommends using the @task.kubernetes decorator instead of the KubernetesPodOperator when using XCom with Python scripts in a dedicated Kubernetes pod.

1from pendulum import datetime
2from airflow.configuration import conf
3from airflow.decorators import dag, task
4import random
5
6# get the current Kubernetes namespace Airflow is running in
7namespace = conf.get("kubernetes", "NAMESPACE")
8
9
10@dag(
11 start_date=datetime(2023, 1, 1),
12 catchup=False,
13 schedule="@daily",
14)
15def kubernetes_decorator_example_dag():
16 @task
17 def extract_data():
18 # simulating querying from a database
19 data_point = random.randint(0, 100)
20 return data_point
21
22 @task.kubernetes(
23 # specify the Docker image to launch, it needs to be able to run a Python script
24 image="python",
25 # launch the Pod on the same cluster as Airflow is running on
26 in_cluster=True,
27 # launch the Pod in the same namespace as Airflow is running in
28 namespace=namespace,
29 # Pod configuration
30 # naming the Pod
31 name="my_pod",
32 # log stdout of the container as task logs
33 get_logs=True,
34 # log events in case of Pod failure
35 log_events_on_failure=True,
36 # enable pushing to XCom
37 do_xcom_push=True,
38 )
39 def transform(data_point):
40 multiplied_data_point = 23 * int(data_point)
41 return multiplied_data_point
42
43 @task
44 def load_data(**context):
45 # pull the XCom value that has been pushed by the KubernetesPodOperator
46 transformed_data_point = context["ti"].xcom_pull(
47 task_ids="transform", key="return_value"
48 )
49 print(transformed_data_point)
50
51 load_data(transform(extract_data()))
52
53
54kubernetes_decorator_example_dag()

Example: Use the KubernetesPodOperator to run a script in another language

A frequent use case for the KubernetesPodOperator is running a task in a language other than Python. To do this, you build a custom Docker image containing the script.

In the following example, the Haskell script runs and the value NAME_TO_GREET is printed on the console:

1main = do
2 name <- getEnv "NAME_TO_GREET"
3 putStrLn ("Hello, " ++ name)

The Dockerfile creates the necessary environment to run the script and then executes it with a CMD command:

1FROM haskell
2WORKDIR /opt/hello_name
3RUN cabal update
4COPY ./haskell_example.cabal /opt/hello_name/haskell_example.cabal
5RUN cabal build --only-dependencies -j4
6COPY . /opt/hello_name
7RUN cabal install
8CMD ["haskell_example"]

After making the Docker image available, it can be run from the KubernetesPodOperator with the image argument. The following example DAG showcases a variety of arguments of the KubernetesPodOperator, including how to pass NAME_TO_GREET to the Haskell code.

1from airflow import DAG
2from pendulum import datetime
3from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
4 KubernetesPodOperator,
5)
6from airflow.configuration import conf
7
8# get the current Kubernetes namespace Airflow is running in
9namespace = conf.get("kubernetes", "NAMESPACE")
10
11# set the name that will be printed
12name = "your_name"
13
14# instantiate the DAG
15with DAG(
16 start_date=datetime(2022, 6, 1),
17 catchup=False,
18 schedule="@daily",
19 dag_id="KPO_different_language_example_dag",
20) as dag:
21 say_hello_name_in_haskell = KubernetesPodOperator(
22 # unique id of the task within the DAG
23 task_id="say_hello_name_in_haskell",
24 # the Docker image to launch
25 image="<image location>",
26 # launch the Pod on the same cluster as Airflow is running on
27 in_cluster=True,
28 # launch the Pod in the same namespace as Airflow is running in
29 namespace=namespace,
30 # Pod configuration
31 # name the Pod
32 name="my_pod",
33 # give the Pod name a random suffix, ensure uniqueness in the namespace
34 random_name_suffix=True,
35 # attach labels to the Pod, can be used for grouping
36 labels={"app": "backend", "env": "dev"},
37 # reattach to worker instead of creating a new Pod on worker failure
38 reattach_on_restart=True,
39 # delete Pod after the task is finished
40 is_delete_operator_pod=True,
41 # get log stdout of the container as task logs
42 get_logs=True,
43 # log events in case of Pod failure
44 log_events_on_failure=True,
45 # pass your name as an environment var
46 env_vars={"NAME_TO_GREET": f"{name}"},
47 )

Example: Use the KubernetesPodOperator with XComs

XCom is a commonly used Airflow feature for passing small amounts of data between tasks. You can use the KubernetesPodOperator to both receive values stored in XCom and push values to XCom.

The following example DAG shows an ETL pipeline with an extract_data task that runs a query on a database and returns a value. The TaskFlow API automatically pushes the return value to XComs.

The transform task is a KubernetesPodOperator which requires that the XCom data is pushed from the upstream task before it, and then launches an image created with the following Dockerfile:

1FROM python
2
3WORKDIR /
4
5# creating the file to write XComs to
6RUN mkdir -p airflow/xcom
7RUN echo "" > airflow/xcom/return.json
8
9COPY multiply_by_23.py ./
10
11CMD ["python", "./multiply_by_23.py"]

When using XComs with the KubernetesPodOperator, you must create the file airflow/xcom/return.json in your Docker container (ideally from within your Dockerfile), because Airflow can only look for XComs to pull at that specific location. IN the following example, the Docker image contains a simple Python script to multiply an environment variable by 23, package the result into JSON, and then write that JSON to the correct file to be retrieved as an XCom. The XComs from the KubernetesPodOperator are pushed only if the task is marked successful.

1# import the result of the previous task as an environment variable
2data_point = os.environ["DATA_POINT"]
3
4# multiply the data point by 23 and package the result into a json
5multiplied_data_point = str(23 * int(data_point))
6return_json = {"return_value": f"{multiplied_data_point}"}
7
8# write to the file checked by Airflow for XComs
9f = open("./airflow/xcom/return.json", "w")
10f.write(f"{return_json}")
11f.close()

The load_data task pulls the XCom returned from the transform task and prints it to the console.

The full DAG code is provided in the following example. To avoid task failure, turn on do_xcom_push after you create the airflow/xcom/return.json within the Docker container run by the KubernetesPodOperator.

1from pendulum import datetime
2from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
3 KubernetesPodOperator,
4)
5from airflow.configuration import conf
6from airflow.decorators import dag, task
7
8import random
9
10# get the current Kubernetes namespace Airflow is running in
11namespace = conf.get("kubernetes", "NAMESPACE")
12
13
14# instantiate the DAG
15@dag(
16 start_date=datetime(2022, 6, 1),
17 catchup=False,
18 schedule="@daily",
19)
20def KPO_XComs_example_dag():
21 @task
22 def extract_data():
23 # simulating querying from a database
24 data_point = random.randint(0, 100)
25 return data_point
26
27 transform = KubernetesPodOperator(
28 # set task id
29 task_id="transform",
30 # specify the Docker image to launch
31 image="<image location>",
32 # launch the Pod on the same cluster as Airflow is running on
33 in_cluster=True,
34 # launch the Pod in the same namespace as Airflow is running in
35 namespace=namespace,
36 # Pod configuration
37 # naming the Pod
38 name="my_pod",
39 # log stdout of the container as task logs
40 get_logs=True,
41 # log events in case of Pod failure
42 log_events_on_failure=True,
43 # pull a variable from XComs using Jinja templating and provide it
44 # to the Pod as an environment variable
45 env_vars={
46 "DATA_POINT": """{{ ti.xcom_pull(task_ids='extract_data',
47 key='return_value') }}"""
48 },
49 # push the contents from xcom.json to Xcoms. Remember to only set this
50 # argument to True if you have created the `airflow/xcom/return.json`
51 # file within the Docker container run by the KubernetesPodOperator.
52 do_xcom_push=True,
53 )
54
55 @task
56 def load_data(**context):
57 # pull the XCom value that has been pushed by the KubernetesPodOperator
58 transformed_data_point = context["ti"].xcom_pull(
59 task_ids="transform", key="return_value"
60 )
61 print(transformed_data_point)
62
63 # set dependencies (tasks defined using Decorators need to be called)
64 extract_data() >> transform >> load_data()
65
66
67KPO_XComs_example_dag()
Traditional
1from airflow import DAG
2from pendulum import datetime
3from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
4 KubernetesPodOperator,
5)
6from airflow.configuration import conf
7from airflow.operators.python import PythonOperator
8
9import random
10
11# get the current Kubernetes namespace Airflow is running in
12namespace = conf.get("kubernetes", "NAMESPACE")
13
14
15def extract_data_function():
16 # simulating querying from a database
17 data_point = random.randint(0, 100)
18 return data_point
19
20
21def load_data_function(**context):
22 # pull the XCom value that has been pushed by the KubernetesPodOperator
23 transformed_data_point = context["ti"].xcom_pull(
24 task_ids="transform", key="return_value"
25 )
26 print(transformed_data_point)
27
28
29# instantiate the DAG
30with DAG(
31 dag_id="KPO_XComs_example_dag",
32 start_date=datetime(2022, 6, 1),
33 catchup=False,
34 schedule="@daily",
35):
36 extract_data = PythonOperator(
37 task_id="extract_data", python_callable=extract_data_function
38 )
39
40 transform = KubernetesPodOperator(
41 # set task id
42 task_id="transform",
43 # specify the Docker image to launch
44 image="<image location>",
45 # launch the Pod on the same cluster as Airflow is running on
46 in_cluster=True,
47 # launch the Pod in the same namespace as Airflow is running in
48 namespace=namespace,
49 # Pod configuration
50 # naming the Pod
51 name="my_pod",
52 # log stdout of the container as task logs
53 get_logs=True,
54 # log events in case of Pod failure
55 log_events_on_failure=True,
56 # pull a variable from XComs using Jinja templating and provide it
57 # to the Pod as an environment variable
58 env_vars={
59 "DATA_POINT": """{{ ti.xcom_pull(task_ids='extract_data',
60 key='return_value') }}"""
61 },
62 # push the contents from xcom.json to Xcoms. Remember to only set this
63 # argument to True if you have created the `airflow/xcom/return.json`
64 # file within the Docker container run by the KubernetesPodOperator.
65 do_xcom_push=True,
66 )
67
68 load_data = PythonOperator(
69 task_id="load_data",
70 python_callable=load_data_function,
71 )
72
73 # set dependencies (tasks defined using Decorators need to be called)
74 extract_data >> transform >> load_data