globe
Webinars

Airflow 2.0 + Kubernetes

Watch On Demand

Hosted By

  • Daniel Imberman Daniel Imberman Strategy Engineer
    Astronomer
  • Viraj Parekh Viraj Parekh Field CTO
    Astronomer

Topics that will be discussed:

  • Kubernetes Executor
  • Kubernetes Pod Operator
  • KEDA Autoscaler

##Kubernetes Executor

  • Each Airflow task is launched as a pod
  • Workers scale to zero
  • Expose Kubernetes API to the data engineer so they can have more control over the resources of each task

###Old Architecture airflowKubernetesOldArc

  • Attempted to abstract Kubernetes API for “simplicity”
  • Result: Lots of PRs to expose Kubernetes, lots of code to maintain, lost of steps before a pod is launched
  • Goal: Offer flexibility of the Kubernetes API and reduce Airflow’s code footprint

###New Architecture airflow-k8-2

  • Every step along the way, users have access to the Kubernetes models.V1Pod API
  • Merging steps is much easier, faster, and stable.
  • Removed 4k lines of code(!)

pod_template_file

  • Infrastructure engineers can now define default pod layouts in yaml or json files
  • Can define default pod_template_file in the airflow.cfg
    apiVersion: v1
    kind: Pod
    metadata:
      name: dummy-name
    spec:
      containers:
        - env:
            - name: AIRFLOW__CORE__EXECUTOR
              value: LocalExecutor
            # Hard Coded Airflow Envs
            - name: AIRFLOW__CORE__FERNET_KEY
              valueFrom:
                secretKeyRef:
                  name: RELEASE-NAME-fernet-key
                  key: fernet-key
            - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
              ….
            # Extra env
          image: apache/airflow:2.0.0
          imagePullPolicy: IfNotPresent
          name: base
          ports: []
          volumeMounts:
            - mountPath: "/opt/airflow/logs"
              name: airflow-logs
            - name: config
              mountPath: "/opt/airflow/airflow.cfg"
              subPath: airflow.cfg
              readOnly: true
      hostNetwork: false
      restartPolicy: Never
      securityContext:
        runAsUser: 50000
        fsGroup: 0
      serviceAccountName: 'RELEASE-NAME-worker'
      volumes:

executor_config

  • New “pod_override” object accepts a k8s.V1Pod instead of a dictionary.
  • Can now use official Kubernetes API reference for building spec
  • Add side-cars, secrets, affinities, etc.
    volume_task = PythonOperator(
      task_id="task_with_volume",
      python_callable=test_volume_mount,
      executor_config={
          "pod_override": k8s.V1Pod(
              spec=k8s.V1PodSpec(
                  containers=[
                      k8s.V1Container(
                          name="base",
                          volume_mounts=[
                              k8s.V1VolumeMount(
                                  mount_path="/foo/", 
                                  name="test-volume"
                              )
                          ],
                      )
                  ],
                  volumes=[
                      k8s.V1Volume(
                          name="test-volume",
                          host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                      )
                  ],
              )
          ),
      },
    )

You can even point to a custom pod_template_file and then add overrides on top of it!

    task_with_template = PythonOperator(
      task_id="task_with_template",
      python_callable=print_stuff,
      executor_config={
          "pod_template_file": os.path.join(
              AIRFLOW_HOME, "pod_templates/basic_template.yaml"
          ),
          "pod_override": k8s.V1Pod(
              metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
          ),
      },
    )

##KubernetesPodOperator

KubernetesPodOperator (KPO) is now in the cncf.kubernetes Provider. Visit the KubernetesPodOperator page of the Astronomer Registry to learn more.

  • KPO is no longer bound to an Airflow version
  • You can get upgrades and bug fixes more often without requiring an Airflow upgrade
  • Backport providers for upgrading to 2.0

airflow-k8-3

##KPO Now Directly uses Kubernetes API

    volume = k8s.V1Volume(
      name='test-volume',
      persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
    )

    volume_mounts = [
      k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
    ]

    env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]

    k = KubernetesPodOperator(
      task_id="task" + self.get_current_task_name(),
      in_cluster=False,
      volume_mounts = volume_mounts,
      volumes=[volume],
      env=env_vars,
      do_xcom_push=True,
    )

##KPO now also allows templates

    template_path = '/airflow/dags/basic_pod.yaml'
    pod_spec = k8s.V1Pod(
        metadata=k8s.V1ObjectMeta(
            labels={"foo": "bar", "fizz": "buzz"},
        ),
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    env=[k8s.V1EnvVar(name="env_name", value="value")],
                )
            ]
        ),
    )

    env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
    k = KubernetesPodOperator(
        task_id="task" + self.get_current_task_name(),
        in_cluster=False,
        pod_template_file=template_path,
        full_pod_spec=pod_spec,
        env=env_vars,
        do_xcom_push=True,
    )

##KEDA Autoscaler

With KubernetesExecutor for every single task you launch, Airflow speaks to the Kubernetes API and launches a pod for that task and runs that pod to completion. This works fantastic for small to medium scale use cases. For really large scale cases, with thousands of tasks at a time, the Kubernetes Executor can become unwieldy.

airflow-k8-4

The KEDA Autoscaler allows you to create custom autoscalers. At Astronomer we created a PostgreSQL autoscaler and donated it back to the KEDA project.

airflow-k8-5

The easiest way to get started with Apache Airflow 2.0 is by using the Astronomer CLI. To make it easy you can get up and running with Airflow by following our Quickstart Guide.



Astronomer Apache Airflow Fundamentals Certification badge

Get Apache Airflow Certified

If you want to learn more about how to get started with Airflow, you can join the thousands of other data engineers who have received the Astronomer Certification for Apache Airflow Fundamentals. This exam assesses an understanding of the basics of the Airflow architecture and the ability to create simple data pipelines for scheduling and monitoring tasks.

Learn More About Certification