Run the KubernetesPodOperator on Astronomer Software

The KubernetesPodOperator is one of the most customizable Apache Airflow operators. A task using the KubernetesPodOperator runs in a dedicated, isolated Kubernetes Pod that terminates after the task completes. To learn more about the benefits and usage of the KubernetesPodOperator, see the KubernetesPodOperator Learn guide.

This guide explains how to complete specific goals using the KubernetesPodOperator on Astronomer Software.

Prerequisites

  • A running Airflow Deployment on Astronomer Software

Set Up the KubernetesPodOperator

Import the operator

  1. Run the following command to install the apache-airflow-providers-cncf-kubernetes package:

    $pip install apache-airflow-providers-cncf-kubernetes
  2. Run the following command to import the KubernetesPodOperator:

    1from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

Specify parameters

Instantiate the operator based on your image and setup:

1from airflow.configuration import conf
2from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
3
4namespace = conf.get("kubernetes", "NAMESPACE")
5
6KubernetesPodOperator(
7 namespace=namespace,
8 image="ubuntu:16.04",
9 cmds=["bash", "-cx"],
10 arguments=["echo", "10", "echo pwd"],
11 labels={"<pod-label>": "<label-name>"},
12 name="airflow-test-pod",
13 is_delete_operator_pod=True,
14 in_cluster=True,
15 task_id="task-two",
16 get_logs=True,
17)

For each instantiation of the KubernetesPodOperator, you must specify the following values:

  • namespace = conf.get("kubernetes", "NAMESPACE"): Every Deployment runs on its own Kubernetes namespace. Information about this namespace can be programmatically imported as long as you set this variable.
  • image: This is the Docker image that the operator will use to run its defined task, commands, and arguments. The value you specify is assumed to be an image tag that’s publicly available on Docker Hub. To pull an image from a private registry, read Pull images from a Private Registry.
  • in_cluster=True: When this value is set, your task will run within the cluster from which it’s instantiated. This ensures that the Kubernetes Pod running your task has the correct permissions within the cluster.
  • is_delete_operator_pod=True: This setting ensures that once a KubernetesPodOperator task is complete, the Kubernetes Pod that ran that task is terminated. This ensures that there are no unused pods in your cluster taking up resources.

Add resources to your Deployment on Astronomer

The KubernetesPodOperator is entirely powered by the resources allocated to the Extra Capacity slider of your deployment’s Configure page in the Software UI in lieu of needing a Celery worker (or scheduler resources for those running the Local Executor). Raising the slider will increase your namespace’s resource quota such that Airflow has permissions to successfully launch pods within your deployment’s namespace.

Your Airflow scheduler and webserver will remain necessary fixed resources that ensure the rest of your tasks can execute and that your deployment stays up and running.

In terms of resource allocation, Astronomer recommends starting with 10AU in Extra Capacity and scaling up from there as needed. If it’s set to 0, you’ll get a permissions error:

ERROR - Exception when attempting to create namespace Pod.
Reason: Forbidden
"Failure","message":"pods is forbidden: User \"system:serviceaccount:astronomer-cloud-solar-orbit-4143:solar-orbit-4143-airflow-worker\" cannot create pods in the namespace \"datarouter\"","reason":"Forbidden","details":{"kind":"pods"},"code":403}

On Astronomer Software, the largest node a single pod can occupy is dependent on the size of your underlying node pool.

If you need to increase your limit range on Astronomer Software, contact your system admin.

Define resources per task

A notable advantage of leveraging Airflow’s KubernetesPodOperator is that you can control compute resources in the task definition.

If you’re using the Kubernetes Executor, note that this value is separate from the executor_config parameter. In this case, the executor_config would only define the Airflow worker that is launching your Kubernetes task.

Example Task Definition:

1from datetime import datetime, timedelta
2
3from airflow import DAG
4from airflow.configuration import conf
5from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
6from kubernetes.client import models as k8s
7
8default_args = {
9 'owner': 'airflow',
10 'depends_on_past': False,
11 'start_date': datetime(2019, 1, 1),
12 'email_on_failure': False,
13 'email_on_retry': False,
14 'retries': 1,
15 'retry_delay': timedelta(minutes=5),
16}
17
18namespace = conf.get('kubernetes', 'NAMESPACE')
19
20# This will detect the default namespace locally and read the
21# environment namespace when deployed to Astronomer.
22if namespace =='default':
23 config_file = '/usr/local/airflow/include/.kube/config'
24 in_cluster = False
25else:
26 in_cluster = True
27 config_file = None
28
29dag = DAG("example_kubernetes_pod", schedule="@once", default_args=default_args)
30
31# This is where you define your resource allocation.
32compute_resources = k8s.V1ResourceRequirements(
33 limits={"cpu": "800m", "memory": "3Gi"},
34 requests={"cpu": "800m", "memory": "3Gi"}
35)
36
37with dag:
38 KubernetesPodOperator(
39 namespace=namespace,
40 image="hello-world",
41 labels={"foo": "bar"},
42 name="airflow-test-pod",
43 task_id="task-one",
44 in_cluster=in_cluster, # if set to true, will look in the cluster, if false, looks for file
45 cluster_context="docker-for-desktop", # is ignored when in_cluster is set to True
46 config_file=config_file,
47 container_resources=compute_resources,
48 is_delete_operator_pod=True,
49 get_logs=True,
50 )

In the example above, the resources are defined by building the following V1ResourceRequirements object:

1from kubernetes.client import models as k8s
2
3compute_resources = k8s.V1ResourceRequirements(
4 limits={"cpu": "800m", "memory": "3Gi"},
5 requests={"cpu": "800m", "memory": "3Gi"}
6)

This object allows you to specify Memory and CPU requests and limits for any given task and its corresponding Kubernetes Pod. For more information, read Kubernetes Documentation on Requests and Limits.

Once you’ve created the object, apply it to the resources parameter of the task. When this DAG runs, it will launch a Pod that runs the hello-world image, which is pulled from Docker Hub, in your Airflow Deployment’s namespace with the resource requests defined above. Once the task finishes, the Pod will be gracefully terminate.

On Astronomer, the equivalent of 1AU is: requests={"cpu": "100m", "memory": "384Mi"}, limits={"cpu": "100m", "memory": "384Mi"}.

Pulling images from a private registry

By default, the KubernetesPodOperator will look for images hosted publicly on Docker Hub. If you want to pull images from a private registry, you may do so.

To pull images from a private registry on Astronomer Software:

  1. Retrieve a config.json file that contains your Docker credentials by following the Docker documentation. The generated file should look something like this:

    1{
    2 "auths": {
    3 "https://index.docker.io/v1/": {
    4 "auth": "c3R...zE2"
    5 }
    6 }
    7}
  2. Follow the Kubernetes documentation to create a secret based on your credentials.

  3. In your DAG code, import models from kubernetes.client and specify image_pull_secrets with your Kubernetes secret. After configuring this value, you can pull an image as you would from a public registry like in the following example.

    1from kubernetes.client import models as k8s
    2
    3KubernetesPodOperator(
    4 namespace=namespace,
    5 image_pull_secrets=[k8s.V1LocalObjectReference("<your-secret-name>")],
    6 image="<your-docker-image>",
    7 cmds=["<commands-for-image>"],
    8 arguments=["<arguments-for-image>"],
    9 labels={"<pod-label>": "<label-name>"},
    10 name="<pod-name>",
    11 is_delete_operator_pod=True,
    12 in_cluster=True,
    13 task_id="<task-name>",
    14 get_logs=True,
    15)

Local testing

Astronomer recommends testing your DAGs locally before pushing them to a Deployment on Astronomer. For more information, read How to run the KubernetesPodOperator locally. That guide provides information on how to use MicroK8s or Docker for Kubernetes to run tasks with the KubernetesPodOperator in a local environment.

To pull images from a private registry locally, you’ll have to create a secret in your local namespace and similarly call it in your operator following the guidelines above.