Run the Kubernetes executor on Astronomer Software

The Kubernetes Executor creates individual Pods that dynamically delegate work and resources to individual tasks. For each task that needs to run, the executor works with the Kubernetes API and dynamically launches Pods which terminate when the task is completed.

You can customize your Kubernetes Pods to scale depending on how many Airflow tasks you’re running at a given time. It also means you can configure the following for each individual Airflow task:

  • Memory allocation
  • Service accounts
  • Airflow image

To configure these resources for a given task’s Pod, you specify a pod_override in your DAG code. To specify a Pod template for many or all of your tasks, you can write a helper function to construct a pod_override in your DAGs or configure a global setting. For more information on configuring Pod template values, reference the Kubernetes documentation.

Prerequisites

You must have an Airflow Deployment on Astronomer running with the Kubernetes executor. For more information on configuring an executor, see Configure a Deployment. To learn more about different executor types, see Airflow executors explained.

Configure the default worker Pod for all Deployments

By default, the Kubernetes executor launches workers based on a podTemplate configuration in the Astronomer Airflow Helm chart.

You can modify the default podTemplate to configure the default worker Pods for all Deployments using the Kubernetes executor on your Astronomer Software installation. You can then override this default at the task level using a pod_override file. See Configure the worker Pod for a specific task.

  1. In your values.yaml file, copy the complete podTemplate configuration from your version of the Astronomer Airflow Helm chart. Your file should look like the following:

    1astronomer:
    2 houston:
    3 config:
    4 deployments:
    5 helm:
    6 airflow:
    7 podTemplate: |
    8 # Licensed to the Apache Software Foundation (ASF) under one
    9 # or more contributor license agreements. See the NOTICE file
    10 # distributed with this work for additional information
    11 # regarding copyright ownership. The ASF licenses this file
    12 # to you under the Apache License, Version 2.0 (the
    13 # "License"); you may not use this file except in compliance
    14 # with the License. You may obtain a copy of the License at
    15 #
    16 # http://www.apache.org/licenses/LICENSE-2.0
    17 #
    18 # Unless required by applicable law or agreed to in writing,
    19 # software distributed under the License is distributed on an
    20 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    21 # KIND, either express or implied. See the License for the
    22 # specific language governing permissions and limitations
    23 # under the License.
    24 ---
    25 {{- $nodeSelector := or .Values.nodeSelector .Values.workers.nodeSelector }}
    26 {{- $affinity := or .Values.affinity .Values.workers.affinity }}
    27 {{- $tolerations := or .Values.tolerations .Values.workers.tolerations }}
    28 apiVersion: v1
    29 kind: Pod
    30 metadata:
    31 name: astronomer-pod-template-file
    32 labels:
    33 tier: airflow
    34 component: worker
    35 release: {{ .Release.Name }}
    36 {{- with .Values.labels }}
    37 {{ toYaml . | indent 4 }}
    38 {{- end }}
    39 {{- if .Values.airflowPodAnnotations }}
    40 annotations:
    41 {{- toYaml .Values.airflowPodAnnotations | nindent 4 }}
    42 {{- end }}
    43 spec:
    44 {{- if or (and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled)) .Values.workers.extraInitContainers }}
    45 initContainers:
    46 {{- if and .Values.dags.gitSync.enabled (not .Values.dags.persistence.enabled) }}
    47 {{- include "git_sync_container" (dict "Values" .Values "is_init" "true") | nindent 4 }}
    48 {{- end }}
    49 {{- if .Values.workers.extraInitContainers }}
    50 {{- toYaml .Values.workers.extraInitContainers | nindent 4 }}
    51 {{- end }}
    52 {{- end }}
    53 containers:
    54 - args: []
    55 command: []
    56 envFrom:
    57 {{- include "custom_airflow_environment_from" . | default "\n []" | indent 6 }}
    58 env:
    59 - name: AIRFLOW__CORE__EXECUTOR
    60 value: LocalExecutor
    61 {{- include "standard_airflow_environment" . | indent 6}}
    62 {{- include "custom_airflow_environment" . | indent 6 }}
    63 image: {{ template "pod_template_image" . }}
    64 imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
    65 name: base
    66 ports: []
    67 volumeMounts:
    68 - mountPath: {{ template "airflow_logs" . }}
    69 name: logs
    70 - name: config
    71 mountPath: {{ template "airflow_config_path" . }}
    72 subPath: airflow.cfg
    73 readOnly: true
    74 {{- if .Values.airflowLocalSettings }}
    75 - name: config
    76 mountPath: {{ template "airflow_local_setting_path" . }}
    77 subPath: airflow_local_settings.py
    78 readOnly: true
    79 {{- end }}
    80 {{- if or .Values.dags.gitSync.enabled .Values.dags.persistence.enabled }}
    81 {{- include "airflow_dags_mount" . | nindent 8 }}
    82 {{- end }}
    83 {{- if .Values.workers.extraVolumeMounts }}
    84 {{ toYaml .Values.workers.extraVolumeMounts | indent 8 }}
    85 {{- end }}
    86 {{- if .Values.workers.extraContainers }}
    87 {{- toYaml .Values.workers.extraContainers | nindent 4 }}
    88 {{- end }}
    89 hostNetwork: false
    90 {{- if or .Values.registry.secretName .Values.registry.connection }}
    91 imagePullSecrets:
    92 - name: {{ template "registry_secret" . }}
    93 {{- end }}
    94 restartPolicy: Never
    95 securityContext:
    96 runAsUser: {{ .Values.uid }}
    97 fsGroup: {{ .Values.gid }}
    98 nodeSelector: {{ toYaml $nodeSelector | nindent 4 }}
    99 affinity: {{ toYaml $affinity | nindent 4 }}
    100 tolerations: {{ toYaml $tolerations | nindent 4 }}
    101 serviceAccountName: {{ include "worker.serviceAccountName" . }}
    102 volumes:
    103 {{- if .Values.dags.persistence.enabled }}
    104 - name: dags
    105 persistentVolumeClaim:
    106 claimName: {{ template "airflow_dags_volume_claim" . }}
    107 {{- else if .Values.dags.gitSync.enabled }}
    108 - name: dags
    109 emptyDir: {}
    110 {{- end }}
    111 {{- if .Values.logs.persistence.enabled }}
    112 - name: logs
    113 persistentVolumeClaim:
    114 claimName: {{ template "airflow_logs_volume_claim" . }}
    115 {{- else }}
    116 - emptyDir: {}
    117 name: logs
    118 {{- end }}
    119 {{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }}
    120 {{- include "git_sync_ssh_key_volume" . | nindent 2 }}
    121 {{- end }}
    122 - configMap:
    123 name: {{ include "airflow_config" . }}
    124 name: config
    125 {{- if .Values.workers.extraVolumes }}
    126 {{ toYaml .Values.workers.extraVolumes | nindent 2 }}
    127 {{- end }}
  2. Customize the pod template configuration based on your use case, such as by requesting default limits on CPU and memory usage. To configure these resources for each Pod, you configure a Pod template. For more information on configuring Pod template values, see the Kubernetes documentation.

  3. Push the configuration change to your platform. See Apply a config change.

Configure the worker Pod for a specific task

For each task with the Kubernetes executor, you can customize its individual worker Pod and override the defaults used in Astronomer Software by configuring a pod_override file.

  1. Add the following import to your DAG file:

    1from kubernetes.client import models as k8s
  2. Add a pod_override configuration to the DAG file containing the task. See the kubernetes-client GitHub for a list of all possible settings you can include in the configuration.

  3. Specify the pod_override in the task’s parameters.

Example: Set CPU or memory limits and requests

One of the most common use cases for customizing a Kubernetes worker Pod is to request a specific amount of resources for a task.

The following example shows how you can use a pod_override configuration in your DAG code to request custom resources for a task:

1import pendulum
2import time
3from airflow import DAG
4from airflow.decorators import task
5from airflow.operators.bash import BashOperator
6from airflow.operators.python import PythonOperator
7from airflow.example_dags.libs.helper import print_stuff
8from kubernetes.client import models as k8s
9k8s_exec_config_resource_requirements = {
10 "pod_override": k8s.V1Pod(
11 spec=k8s.V1PodSpec(
12 containers=[
13 k8s.V1Container(
14 name="base",
15 resources=k8s.V1ResourceRequirements(
16 requests={"cpu": 0.5, "memory": "1024Mi"},
17 limits={"cpu": 0.5, "memory": "1024Mi"}
18 )
19 )
20 ]
21 )
22 )
23}
24with DAG(
25 dag_id="example_kubernetes_executor_pod_override_sources",
26 schedule=None,
27 start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
28 catchup=False
29):
30 BashOperator(
31 task_id="bash_resource_requirements_override_example",
32 bash_command="echo hi",
33 executor_config=k8s_exec_config_resource_requirements
34 )
35
36 @task(executor_config=k8s_exec_config_resource_requirements)
37 def resource_requirements_override_example():
38 print_stuff()
39 time.sleep(60)
40
41 resource_requirements_override_example()

When this DAG runs, it launches a Kubernetes Pod with exactly 0.5m of CPU and 1024Mi of memory, as long as that infrastructure is available in your cluster. Once the task finishes, the Pod terminates gracefully.

Mount secret environment variables to worker Pods

Deployment environment variables marked as secrets are stored in a Kubernetes secret called <release-name>-env on your Deployment namespace. To use a secret value in a task running on the KubernetesExecutor, mount the secret to the Pod running the task.

  1. Run the following command to find the namespace (release name) of your Airflow Deployment:

    1kubectl get ns
  2. Add the following import to your DAG file:

    1from airflow.kubernetes.secret import Secret
  3. Define a Kubernetes Secret in your DAG instantiation using the following format:

    1secret_env = Secret(deploy_type="env", deploy_target="<SECRET_KEY>", secret="<release-name>-env", key="<SECRET_KEY>")
    2namespace = conf.get("kubernetes", "<release-name>")
  4. Specify the Secret in the secret_key_ref section of your pod_override configuration.

  5. In the task where you want to use the secret value, add the following task-level argument:

    1op_kwargs={
    2 "env_name": secret_env.deploy_target
    3},
  6. In the executable for the task, call the secret value using os.environ[env_name].

In the following example, a secret named MY_SECRET is pulled from infrared-photon-7780-env and printed to logs.

1import pendulum
2from kubernetes.client import models as k8s
3
4from airflow.configuration import conf
5from airflow.kubernetes.secret import Secret
6from airflow.models import DAG
7from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
8from airflow.operators.python import PythonOperator
9
10def print_env(env_name):
11 import os
12 print(os.environ[env_name])
13
14with DAG(
15 dag_id='test-secret',
16 start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
17 end_date=pendulum.datetime(2022, 1, 5, tz="UTC"),
18 schedule_interval="@once",
19) as dag:
20 secret_env = Secret(deploy_type="env", deploy_target="MY_SECRET", secret="infrared-photon-7780-env", key="MY_SECRET")
21 namespace = conf.get("kubernetes", "infrared-photon-7780")
22
23 p = PythonOperator(
24 python_callable=print_env,
25 op_kwargs={
26 "env_name": secret_env.deploy_target
27 },
28 task_id='test-py-env',
29 executor_config={
30 "pod_override": k8s.V1Pod(
31 spec=k8s.V1PodSpec(
32 containers=[
33 k8s.V1Container(
34 name="base",
35 env=[
36 k8s.V1EnvVar(
37 name=secret_env.deploy_target,
38 value_from=k8s.V1EnvVarSource(
39 secret_key_ref=k8s.V1SecretKeySelector(name=secret_env.secret,
40 key=secret_env.key)
41 ),
42 )
43 ],
44 )
45 ]
46 )
47 ),
48 }
49 )