
Hosted By
Topics that will be discussed:
- Kubernetes Executor
- Kubernetes Pod Operator
- KEDA Autoscaler
Kubernetes Executor
- Each Airflow task is launched as a pod
- Workers scale to zero
- Expose Kubernetes API to the data engineer so they can have more control over the resources of each task
Old Architecture
- Attempted to abstract Kubernetes API for “simplicity”
- Result: Lots of PRs to expose Kubernetes, lots of code to maintain, lost of steps before a pod is launched
- Goal: Offer flexibility of the Kubernetes API and reduce Airflow’s code footprint
New Architecture
- Every step along the way, users have access to the Kubernetes models.V1Pod API
- Merging steps is much easier, faster, and stable.
- Removed 4k lines of code(!)
pod_template_file
- Infrastructure engineers can now define default pod layouts in yaml or json files
- Can define default
pod_template_file
in theairflow.cfg
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
….
# Extra env
image: apache/airflow:2.0.0
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 0
serviceAccountName: 'RELEASE-NAME-worker'
volumes:
executor_config
- New “pod_override” object accepts a k8s.V1Pod instead of a dictionary.
- Can now use official Kubernetes API reference for building spec
- Add side-cars, secrets, affinities, etc.
volume_task = PythonOperator(
task_id="task_with_volume",
python_callable=test_volume_mount,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/",
name="test-volume"
)
],
)
],
volumes=[
k8s.V1Volume(
name="test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
},
)
You can even point to a custom pod_template_file
and then add overrides on top of it!
task_with_template = PythonOperator(
task_id="task_with_template",
python_callable=print_stuff,
executor_config={
"pod_template_file": os.path.join(
AIRFLOW_HOME, "pod_templates/basic_template.yaml"
),
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
),
},
)
KubernetesPodOperator
KubernetesPodOperator (KPO) is now in the cncf.kubernetes Provider. Visit the KubernetesPodOperator page of the Astronomer Registry to learn more.
- KPO is no longer bound to an Airflow version
- You can get upgrades and bug fixes more often without requiring an Airflow upgrade
- Backport providers for upgrading to 2.0
KPO Now Directly uses Kubernetes API
volume = k8s.V1Volume(
name='test-volume',
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
)
volume_mounts = [
k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
]
env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
k = KubernetesPodOperator(
task_id="task" + self.get_current_task_name(),
in_cluster=False,
volume_mounts = volume_mounts,
volumes=[volume],
env=env_vars,
do_xcom_push=True,
)
KPO now also allows templates
template_path = '/airflow/dags/basic_pod.yaml'
pod_spec = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
labels={"foo": "bar", "fizz": "buzz"},
),
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
env=[k8s.V1EnvVar(name="env_name", value="value")],
)
]
),
)
env_vars = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
k = KubernetesPodOperator(
task_id="task" + self.get_current_task_name(),
in_cluster=False,
pod_template_file=template_path,
full_pod_spec=pod_spec,
env=env_vars,
do_xcom_push=True,
)
KEDA Autoscaler
With KubernetesExecutor for every single task you launch, Airflow speaks to the Kubernetes API and launches a pod for that task and runs that pod to completion. This works fantastic for small to medium scale use cases. For really large scale cases, with thousands of tasks at a time, the Kubernetes Executor can become unwieldy.
The KEDA Autoscaler allows you to create custom autoscalers. At Astronomer we created a PostgreSQL autoscaler and donated it back to the KEDA project.
The easiest way to get started with Apache Airflow 2.0 is by using the Astronomer CLI. To make it easy you can get up and running with Airflow by following our Quickstart Guide.