WEBINARS

Airflow 2.0 + Kubernetes

Watch Video On Demand

Recorded On

Hosted By

  • Daniel Imberman
  • Viraj Parekh

Note: This webinar was recorded in April 2021 and many exciting features have been added to Airflow and Astro since then. We recommend you also check out our in-depth guide on how to Use the KubernetesPodOperator. Astronomer customers can learn more about the KubernetesExecutor and the KubernetesPodOperator on Astro in our documentation.

Topics that will be discussed:

Kubernetes Executor

Old Architecture

airflowKubernetesOldArc

New Architecture

airflow-k8-2

pod_template_file

    apiVersion: v1
    kind: Pod
    metadata:
      name: dummy-name
    spec:
      containers:
        - env:
            - name: AIRFLOW__CORE__EXECUTOR
              value: LocalExecutor
            # Hard Coded Airflow Envs
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: RELEASE-NAME-fernet-key
                  key: fernet-key
            - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
              ….
            # Extra env
          image: apache/airflow:2.0.0
          imagePullPolicy: IfNotPresent
          name: base
          ports: []
          volumeMounts:
            - mountPath: "/opt/airflow/logs"
              name: airflow-logs
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      hostNetwork: false
      restartPolicy: Never
      securityContext:
        runAsUser: 50000
        fsGroup: 0
      serviceAccountName: 'RELEASE-NAME-worker'
      volumes:

executor_config

    volume_task = PythonOperator(
      task_id="task_with_volume",
      python_callable=test_volume_mount,
      executor_config={
          "pod_override": k8s.V1Pod(
              spec=k8s.V1PodSpec(
                  containers=[
                      k8s.V1Container(
                          name="base",
                          volume_mounts=[
                              k8s.V1VolumeMount(
                                  mount_path="/foo/", 
                                  name="test-volume"
                              )
                          ],
                      )
                  ],
                  volumes=[
                      k8s.V1Volume(
                          name="test-volume",
                          host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                      )
                  ],
              )
          ),
      },
    )

You can even point to a custom pod_template_file and then add overrides on top of it!

    task_with_template = PythonOperator(
      task_id="task_with_template",
      python_callable=print_stuff,
      executor_config={
          "pod_template_file": os.path.join(
              AIRFLOW_HOME, "pod_templates/basic_template.yaml"
          ),
          "pod_override": k8s.V1Pod(
              metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
          ),
      },
    )

KubernetesPodOperator

KubernetesPodOperator (KPO) is now in the cncf.kubernetes Provider. Visit the KubernetesPodOperator page of the Astronomer Registry to learn more.

airflow-k8-3

KPO Now Directly uses Kubernetes API

    volume = k8s.V1Volume(
      name='test-volume',
      persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
    )

    volume_mounts = [
      k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
    ]

    env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]

    k = KubernetesPodOperator(
      task_id="task" + self.get_current_task_name(),
      in_cluster=False,
      volume_mounts = volume_mounts,
      volumes=[volume],
      env=env_vars,
      do_xcom_push=True,
    )

KPO now also allows templates

    template_path = '/airflow/dags/basic_pod.yaml'
    pod_spec = k8s.V1Pod(
        metadata=k8s.V1ObjectMeta(
            labels={"foo": "bar", "fizz": "buzz"},
        ),
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    env=[k8s.V1EnvVar(name="env_name", value="value")],
                )
            ]
        ),
    )

    env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
    k = KubernetesPodOperator(
        task_id="task" + self.get_current_task_name(),
        in_cluster=False,
        pod_template_file=template_path,
        full_pod_spec=pod_spec,
        env=env_vars,
        do_xcom_push=True,
    )

KEDA Autoscaler

With KubernetesExecutor for every single task you launch, Airflow speaks to the Kubernetes API and launches a pod for that task and runs that pod to completion. This works fantastic for small to medium scale use cases. For really large scale cases, with thousands of tasks at a time, the Kubernetes Executor can become unwieldy.

airflow-k8-4

The KEDA Autoscaler allows you to create custom autoscalers. At Astronomer we created a PostgreSQL autoscaler and donated it back to the KEDA project.

airflow-k8-5

The easiest way to get started with Apache Airflow® 2.0 is by using the Astronomer CLI. To make it easy you can get up and running with Airflow by following our Quickstart Guide.

Build, run, & observe your data workflows.
All in one place.

Get $300 in free credits during your 14-day trial.

Get Started Free