WEBINARS

Airflow 2.0 + Kubernetes

Watch Video On Demand

Hosted By

  • Daniel Imberman
  • Viraj Parekh

Note: This webinar was recorded in April 2021 and many exciting features have been added to Airflow and Astro since then. We recommend you also check out our in-depth guide on how to Use the KubernetesPodOperator. Astronomer customers can learn more about the KubernetesExecutor and the KubernetesPodOperator on Astro in our documentation.

Topics that will be discussed:

Kubernetes Executor

Old Architecture

airflowKubernetesOldArc

New Architecture

airflow-k8-2

pod_template_file

    apiVersion: v1
    kind: Pod
    metadata:
      name: dummy-name
    spec:
      containers:
        - env:
            - name: AIRFLOW__CORE__EXECUTOR
              value: LocalExecutor
            # Hard Coded Airflow Envs
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: RELEASE-NAME-fernet-key
                  key: fernet-key
            - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
              ….
            # Extra env
          image: apache/airflow:2.0.0
          imagePullPolicy: IfNotPresent
          name: base
          ports: []
          volumeMounts:
            - mountPath: "/opt/airflow/logs"
              name: airflow-logs
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      hostNetwork: false
      restartPolicy: Never
      securityContext:
        runAsUser: 50000
        fsGroup: 0
      serviceAccountName: 'RELEASE-NAME-worker'
      volumes:

executor_config

    volume_task = PythonOperator(
      task_id="task_with_volume",
      python_callable=test_volume_mount,
      executor_config={
          "pod_override": k8s.V1Pod(
              spec=k8s.V1PodSpec(
                  containers=[
                      k8s.V1Container(
                          name="base",
                          volume_mounts=[
                              k8s.V1VolumeMount(
                                  mount_path="/foo/", 
                                  name="test-volume"
                              )
                          ],
                      )
                  ],
                  volumes=[
                      k8s.V1Volume(
                          name="test-volume",
                          host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                      )
                  ],
              )
          ),
      },
    )

You can even point to a custom pod_template_file and then add overrides on top of it!

    task_with_template = PythonOperator(
      task_id="task_with_template",
      python_callable=print_stuff,
      executor_config={
          "pod_template_file": os.path.join(
              AIRFLOW_HOME, "pod_templates/basic_template.yaml"
          ),
          "pod_override": k8s.V1Pod(
              metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
          ),
      },
    )

KubernetesPodOperator

KubernetesPodOperator (KPO) is now in the cncf.kubernetes Provider. Visit the KubernetesPodOperator page of the Astronomer Registry to learn more.

airflow-k8-3

KPO Now Directly uses Kubernetes API

    volume = k8s.V1Volume(
      name='test-volume',
      persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
    )

    volume_mounts = [
      k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
    ]

    env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]

    k = KubernetesPodOperator(
      task_id="task" + self.get_current_task_name(),
      in_cluster=False,
      volume_mounts = volume_mounts,
      volumes=[volume],
      env=env_vars,
      do_xcom_push=True,
    )

KPO now also allows templates

    template_path = '/airflow/dags/basic_pod.yaml'
    pod_spec = k8s.V1Pod(
        metadata=k8s.V1ObjectMeta(
            labels={"foo": "bar", "fizz": "buzz"},
        ),
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    env=[k8s.V1EnvVar(name="env_name", value="value")],
                )
            ]
        ),
    )

    env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
    k = KubernetesPodOperator(
        task_id="task" + self.get_current_task_name(),
        in_cluster=False,
        pod_template_file=template_path,
        full_pod_spec=pod_spec,
        env=env_vars,
        do_xcom_push=True,
    )

KEDA Autoscaler

With KubernetesExecutor for every single task you launch, Airflow speaks to the Kubernetes API and launches a pod for that task and runs that pod to completion. This works fantastic for small to medium scale use cases. For really large scale cases, with thousands of tasks at a time, the Kubernetes Executor can become unwieldy.

airflow-k8-4

The KEDA Autoscaler allows you to create custom autoscalers. At Astronomer we created a PostgreSQL autoscaler and donated it back to the KEDA project.

airflow-k8-5

The easiest way to get started with Apache Airflow 2.0 is by using the Astronomer CLI. To make it easy you can get up and running with Airflow by following our Quickstart Guide.

Ready to Get Started?

See how your team can fuel its data workflows with more power and less complexity than ever before.

Start Free Trial →

Which plan works best for your team?

Learn about pricing →

What can Astronomer do for your organization?

Talk to an expert →