Run the Kubernetes executor on Astronomer Software

The Kubernetes Executor creates individual Pods that dynamically delegate work and resources to individual tasks. For each task that needs to run, the executor works with the Kubernetes API and dynamically launches Pods which terminate when the task is completed.

You can customize your Kubernetes Pods to scale depending on how many Airflow tasks you’re running at a given time. It also means you can configure the following for each individual Airflow task:

  • Memory allocation
  • Service accounts
  • Airflow image

To configure these resources for a given task’s Pod, you specify a pod_override in your DAG code. To specify a Pod template for many or all of your tasks, you can write a helper function to construct a pod_override in your DAGs or configure a global setting. For more information on configuring Pod template values, reference the Kubernetes documentation.

Prerequisites

You must have an Airflow Deployment on Astronomer running with the Kubernetes executor. For more information on configuring an executor, see Configure a Deployment. To learn more about different executor types, see Airflow executors explained.

You can disable automatic setting of worker CPU/memory resources from the API or UI using the workers.resources.enabled option for the KubernetesExecutor.

For complete details, including YAML examples and guidance for both default and custom configurations, see Manage Kubernetes worker CPU and memory outside UI/API.

Configure the default worker Pod for all Deployments

By default, the Kubernetes executor launches workers based on a podTemplate configuration in the Astronomer Airflow Helm chart.

You can modify the default podTemplate to configure the default worker Pods for all Deployments using the Kubernetes executor on your Astronomer Software installation. You can then override this default at the task level using a pod_override file. See Configure the worker Pod for a specific task.

  1. In your values.yaml file, copy the complete podTemplate configuration from your version of the Astronomer Airflow Helm chart. Your file should look like the following:

    1astronomer:
    2 houston:
    3 config:
    4 deployments:
    5 helm:
    6 airflow:
    7 podTemplate: |
    8 # Licensed to the Apache Software Foundation (ASF) under one
    9 # or more contributor license agreements. See the NOTICE file
    10 # distributed with this work for additional information
    11 # regarding copyright ownership. The ASF licenses this file
    12 # to you under the Apache License, Version 2.0 (the
    13 # "License"); you may not use this file except in compliance
    14 # with the License. You may obtain a copy of the License at
    15 #
    16 # http://www.apache.org/licenses/LICENSE-2.0
    17 #
    18 # Unless required by applicable law or agreed to in writing,
    19 # software distributed under the License is distributed on an
    20 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    21 # KIND, either express or implied. See the License for the
    22 # specific language governing permissions and limitations
    23 # under the License.
    24 ---
    25 {{- $nodeSelector := or .Values.nodeSelector .Values.workers.nodeSelector }}
    26 {{- $affinity := or .Values.affinity .Values.workers.affinity }}
    27 {{- $tolerations := or .Values.tolerations .Values.workers.tolerations }}
    28 {{- $securityContext := include "airflowPodSecurityContext" (list . .Values.workers) }}
    29 {{- $containerSecurityContext := include "containerSecurityContext" (list . .Values.workers) }}
    30 apiVersion: v1
    31 kind: Pod
    32 metadata:
    33 name: astronomer-pod-template-file
    34 labels:
    35 tier: airflow
    36 component: worker
    37 release: {{ .Release.Name }}
    38 {{- if or (.Values.labels) (.Values.workers.labels) }}
    39 {{- mustMerge .Values.workers.labels .Values.labels | toYaml | nindent 4 }}
    40 {{- end }}
    41 {{- if or .Values.airflowPodAnnotations .Values.workers.podAnnotations }}
    42 annotations:
    43 {{- if .Values.airflowPodAnnotations }}
    44 {{- toYaml .Values.airflowPodAnnotations | nindent 4 }}
    45 {{- end }}
    46 {{- if .Values.workers.podAnnotations }}
    47 {{- toYaml .Values.workers.podAnnotations | nindent 4 }}
    48 {{- end }}
    49 {{- end }}
    50 spec:
    51 {{- if or (and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled)) .Values.workers.extraInitContainers }}
    52 initContainers:
    53 {{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}
    54 {{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | nindent 4 }}
    55 {{- end }}
    56 {{- if .Values.workers.extraInitContainers }}
    57 {{- toYaml .Values.workers.extraInitContainers | nindent 4 }}
    58 {{- end }}
    59 {{- end }}
    60 containers:
    61 - args: []
    62 command: []
    63 envFrom:
    64 {{- include "custom_airflow_environment_from" . | default "\n []" | indent 6 }}
    65 env:
    66 - name: AIRFLOW__CORE__EXECUTOR
    67 value: LocalExecutor
    68 {{- include "standard_airflow_environment" . | indent 6}}
    69 {{- include "custom_airflow_environment" . | indent 6 }}
    70 {{- include "container_extra_envs" (list . .Values.workers.env) | indent 6 }}
    71 image: {{ template "pod_template_image" . }}
    72 imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
    73 securityContext: {{ $containerSecurityContext | nindent 8 }}
    74 name: base
    75 ports: []
    76 resources: {{- toYaml .Values.workers.resources | nindent 8 }}
    77 volumeMounts:
    78 - mountPath: {{ template "airflow_logs" . }}
    79 name: logs
    80 - name: config
    81 mountPath: {{ template "airflow_config_path" . }}
    82 subPath: airflow.cfg
    83 readOnly: true
    84 {{- if .Values.airflowLocalSettings }}
    85 - name: config
    86 mountPath: {{ template "airflow_local_setting_path" . }}
    87 subPath: airflow_local_settings.py
    88 readOnly: true
    89 {{- end }}
    90 {{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}
    91 {{- include "airflow_dags_mount" . | nindent 8 }}
    92 {{- end }}
    93 {{- if .Values.workers.extraVolumeMounts }}
    94 {{ toYaml .Values.workers.extraVolumeMounts | indent 8 }}
    95 {{- end }}
    96 {{- if .Values.workers.extraContainers }}
    97 {{- toYaml .Values.workers.extraContainers | nindent 4 }}
    98 {{- end }}
    99 hostNetwork: false
    100 {{- if .Values.workers.priorityClassName }}
    101 priorityClassName: {{ .Values.workers.priorityClassName }}
    102 {{- end }}
    103 {{- if .Values.workers.runtimeClassName }}
    104 runtimeClassName: {{ .Values.workers.runtimeClassName }}
    105 {{- end }}
    106 {{- if .Values.workers.hostAliases }}
    107 hostAliases: {{- toYaml .Values.workers.hostAliases | nindent 4 }}
    108 {{- end }}
    109 {{- if or .Values.registry.secretName .Values.registry.connection }}
    110 imagePullSecrets:
    111 - name: {{ template "registry_secret" . }}
    112 {{- end }}
    113 restartPolicy: Never
    114 nodeSelector: {{ toYaml $nodeSelector | nindent 4 }}
    115 securityContext: {{ $securityContext | nindent 4 }}
    116 affinity: {{ toYaml $affinity | nindent 4 }}
    117 tolerations: {{ toYaml $tolerations | nindent 4 }}
    118 serviceAccountName: {{ include "worker.serviceAccountName" . }}
    119 volumes:
    120 {{- if .Values.dags.persistence.enabled }}
    121 - name: dags
    122 persistentVolumeClaim:
    123 claimName: {{ template "airflow_dags_volume_claim" . }}
    124 {{- else if .Values.dags.gitSync.enabled }}
    125 - name: dags
    126 emptyDir: {}
    127 {{- end }}
    128 {{- if .Values.logs.persistence.enabled }}
    129 - name: logs
    130 persistentVolumeClaim:
    131 claimName: {{ template "airflow_logs_volume_claim" . }}
    132 {{- else }}
    133 - emptyDir: {}
    134 name: logs
    135 {{- end }}
    136 {{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
    137 {{- include "git_sync_ssh_key_volume" . | nindent 2 }}
    138 {{- end }}
    139 - configMap:
    140 name: {{ include "airflow_config" . }}
    141 name: config
    142 {{- if .Values.workers.extraVolumes }}
    143 {{ toYaml .Values.workers.extraVolumes | nindent 2 }}
    144 {{- end }}
  2. Customize the pod template configuration based on your use case, such as by requesting default limits on CPU and memory usage. To configure these resources for each Pod, you configure a Pod template. For more information on configuring Pod template values, see the Kubernetes documentation.

  3. Push the configuration change to your platform. See Apply a config change.

Configure the worker Pod for a specific task

For each task with the Kubernetes executor, you can customize its individual worker Pod and override the defaults used in Astronomer Software by configuring a pod_override file.

  1. Add the following import to your DAG file:

    1from kubernetes.client import models as k8s
  2. Add a pod_override configuration to the DAG file containing the task. See the kubernetes-client GitHub for a list of all possible settings you can include in the configuration.

  3. Specify the pod_override in the task’s parameters.

Example: Set CPU or memory limits and requests

One of the most common use cases for customizing a Kubernetes worker Pod is to request a specific amount of resources for a task.

The following example shows how you can use a pod_override configuration in your DAG code to request custom resources for a task:

1import pendulum
2import time
3from airflow import DAG
4from airflow.decorators import task
5from airflow.operators.bash import BashOperator
6from airflow.operators.python import PythonOperator
7from airflow.example_dags.libs.helper import print_stuff
8from kubernetes.client import models as k8s
9k8s_exec_config_resource_requirements = {
10 "pod_override": k8s.V1Pod(
11 spec=k8s.V1PodSpec(
12 containers=[
13 k8s.V1Container(
14 name="base",
15 resources=k8s.V1ResourceRequirements(
16 requests={"cpu": 0.5, "memory": "1024Mi"},
17 limits={"cpu": 0.5, "memory": "1024Mi"}
18 )
19 )
20 ]
21 )
22 )
23}
24with DAG(
25 dag_id="example_kubernetes_executor_pod_override_sources",
26 schedule=None,
27 start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
28 catchup=False
29):
30 BashOperator(
31 task_id="bash_resource_requirements_override_example",
32 bash_command="echo hi",
33 executor_config=k8s_exec_config_resource_requirements
34 )
35
36 @task(executor_config=k8s_exec_config_resource_requirements)
37 def resource_requirements_override_example():
38 print_stuff()
39 time.sleep(60)
40
41 resource_requirements_override_example()

When this DAG runs, it launches a Kubernetes Pod with exactly 0.5m of CPU and 1024Mi of memory, as long as that infrastructure is available in your cluster. Once the task finishes, the Pod terminates gracefully.

You can also configure the default request and limit values for CPU and memory for KubernetesExecutor Pods in the Software UI. See more about configuring custom resources in the UI.

Manage default ephemeral storage configurations.

You can use the ephemeralStorage.disabled configuration to define whether your task Pods assign ephemeral storage amounts from either your default configurations or your pod_override configuration.

The following Houston API configuration for ephemeralstorage.disabled is set to true by default. When set to true, Astronomer does not configure any ephemeral storage. However, you can set ephemeral storage at the DAG level by using your Pod Override.

1astronomer:
2 houston:
3 config:
4 deployments:
5 executors:
6 - name: KubernetesExecutor
7 workers:
8 ephemeralStorage:
9 disabled: true

You can configure ephemeral storage for all Deployments using the resource configurations set in your Houston configmap by setting ephemeralstorage.disabled to false.

1astronomer:
2 houston:
3 config:
4 deployments:
5 executors:
6 - name: KubernetesExecutor
7 workers:
8 ephemeralStorage:
9 disabled: false

Mount secret environment variables to worker Pods

Deployment environment variables marked as secrets are stored in a Kubernetes secret called <release-name>-env on your Deployment namespace. To use a secret value in a task running on the KubernetesExecutor, mount the secret to the Pod running the task.

  1. Run the following command to find the namespace (release name) of your Airflow Deployment:

    1kubectl get ns
  2. Add the following import to your DAG file:

    1from airflow.kubernetes.secret import Secret
  3. Define a Kubernetes Secret in your DAG instantiation using the following format:

    1secret_env = Secret(deploy_type="env", deploy_target="<SECRET_KEY>", secret="<release-name>-env", key="<SECRET_KEY>")
    2namespace = conf.get("kubernetes", "<release-name>")
  4. Specify the Secret in the secret_key_ref section of your pod_override configuration.

  5. In the task where you want to use the secret value, add the following task-level argument:

    1op_kwargs={
    2 "env_name": secret_env.deploy_target
    3},
  6. In the executable for the task, call the secret value using os.environ[env_name].

In the following example, a secret named MY_SECRET is pulled from infrared-photon-7780-env and printed to logs.

1import pendulum
2from kubernetes.client import models as k8s
3
4from airflow.configuration import conf
5from airflow.kubernetes.secret import Secret
6from airflow.models import DAG
7from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
8from airflow.operators.python import PythonOperator
9
10def print_env(env_name):
11 import os
12 print(os.environ[env_name])
13
14with DAG(
15 dag_id='test-secret',
16 start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
17 end_date=pendulum.datetime(2022, 1, 5, tz="UTC"),
18 schedule_interval="@once",
19) as dag:
20 secret_env = Secret(deploy_type="env", deploy_target="MY_SECRET", secret="infrared-photon-7780-env", key="MY_SECRET")
21 namespace = conf.get("kubernetes", "infrared-photon-7780")
22
23 p = PythonOperator(
24 python_callable=print_env,
25 op_kwargs={
26 "env_name": secret_env.deploy_target
27 },
28 task_id='test-py-env',
29 executor_config={
30 "pod_override": k8s.V1Pod(
31 spec=k8s.V1PodSpec(
32 containers=[
33 k8s.V1Container(
34 name="base",
35 env=[
36 k8s.V1EnvVar(
37 name=secret_env.deploy_target,
38 value_from=k8s.V1EnvVarSource(
39 secret_key_ref=k8s.V1SecretKeySelector(name=secret_env.secret,
40 key=secret_env.key)
41 ),
42 )
43 ],
44 )
45 ]
46 )
47 ),
48 }
49 )