Launch a Pod in an external cluster

Launch a Pod in an external cluster

If some of your tasks require specific resources such as a GPU, you might want to run them in a different cluster than your Airflow instance. In setups where both clusters are used by the same AWS, Azure or GCP account, you can manage separate clusters with roles and permissions.

To launch Pods in external clusters from a local Airflow environment, you must have valid authentication for the external cluster so that your local Airflow environment has permissions to launch a Pod in the external cluster. For managed Kubernetes services from public cloud providers, authentication is federated through the native IAM service. To grant the Astro role permissions to launch pods on your cluster, you can either include static credentials or use workload identity to authorize the Astro role to your cluster.

Prerequisites

Setup

This example shows how to set up an EKS cluster on AWS and run a Pod on it from an Airflow instance where cross-account access is not available.

Step 1: Set up your external cluster

  1. Create an EKS cluster IAM role with a unique name and add the following permission policies:

    • AmazonEKSWorkerNodePolicy
    • AmazonEKS_CNI_Policy
    • AmazonEC2ContainerRegistryReadOnly

    Record the ARN of the new role, as it will be needed below.

  2. Update the trust policy of this new role to include the workload identity of your Deployment. This step ensures that the role can be assumed by your Deployment.

    1{
    2"Version": "2012-10-17",
    3"Statement": [
    4 {
    5 "Effect": "Allow",
    6 "Principal": {
    7 "AWS": "arn:aws:iam::<aws account id>:<your user>",
    8 "Service": [
    9 "ec2.amazonaws.com",
    10 "eks.amazonaws.com"
    11 ]
    12 },
    13 "Action": "sts:AssumeRole"
    14 }
    15]
    16}
  3. If you don’t already have a cluster, create a new EKS cluster and assign the new role to it.

Step 2: Retrieve the KubeConfig file from the EKS cluster

  1. Use a KubeConfig file to remotely connect to your new cluster. On AWS, you can run the following command to retrieve it:

    $aws eks --region <your-region> update-kubeconfig --name <cluster-name> --kubeconfig my_kubeconfig.yaml

    This command creates a new KubeConfig file called my_kubeconfig.yaml.

  2. Ensure that the file below matches your generated KubeConfig. The newly generated KubeConfig must be edited to instruct the AWS IAM Authenticator for Kubernetes to assume your new IAM Role created in Step 1. Replace <your assume role arn> with the IAM Role ARN from Step 1.

    1apiVersion: v1
    2clusters:
    3- cluster:
    4 certificate-authority-data: <base 64 public certificate>
    5 server: <Kubernetes API Endpoint>
    6 name: <arn of your cluster>
    7contexts:
    8- context:
    9 cluster: <arn of your cluster>
    10 user: <arn of your cluster>
    11 name: <arn of your cluster>
    12current-context: <arn of your cluster>
    13kind: Config
    14preferences: {}
    15users:
    16- name: <arn of your cluster>
    17 user:
    18 exec:
    19 apiVersion: client.authentication.k8s.io/v1alpha1
    20 args:
    21 - --region
    22 - <your cluster's AWS region>
    23 - eks
    24 - get-token
    25 - --cluster-name
    26 - <name of your cluster>
    27 - --role
    28 - <your assume role arn>
    29 command: aws
    30 interactiveMode: IfAvailable
    31 provideClusterInfo: false

Step 3: Create a Kubernetes cluster connection

Astronomer recommends creating a Kubernetes cluster connection because it’s more secure than adding an unencrypted kubeconfig file directly to your Astro project.

  1. Convert the kubeconfig configuration you retrieved from your cluster to JSON format.
  2. In either the Airflow UI or the Astro environment manager, create a new Kubernetes Cluster Connection connection. In the Kube config (JSON format) field, paste the kubeconfig configuration you retrieved from your cluster after converting it from yaml to json format.
  3. Click Save.

You can now specify this connection in the configuration of any KubernetesPodOperator task that needs to access your external cluster.

Step 4: Install the AWS CLI in your Astro environment

To connect to your external EKS cluster, you need to install the AWS CLI in your Astro project.

  1. Add the following to your Dockerfile to install the AWS CLI:

    1USER root
    2
    3RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
    4# Note: if you are testing your pipeline locally you may need to adjust the zip version to your dev local environment
    5RUN unzip awscliv2.zip
    6RUN ./aws/install
    7
    8USER astro
  2. Add the unzip package to your packages.txt file to make the unzip command available in your Docker container:

    unzip

If you are working locally, you need to restart your Astro project to apply the changes.

Step 5: Configure your task

In your KubernetesPodOperator task configuration, ensure that you set cluster-context and namespace for your remote cluster. In the following example, the task launches a Pod in an external cluster based on the configuration defined in the k8s connection.

1run_on_EKS = KubernetesPodOperator(
2 task_id="run_on_EKS",
3 kubernetes_conn_id="k8s",
4 cluster_context="<your-cluster-id>",
5 namespace="<your-namespace>",
6 name="example_pod",
7 image="ubuntu",
8 cmds=["bash", "-cx"],
9 arguments=["echo hello"],
10 get_logs=True,
11 startup_timeout_seconds=240,
12)
Example dag

The following dag uses several classes from the Amazon provider package to dynamically spin up and delete Pods for each task in a newly created node group. If your remote Kubernetes cluster already has a node group available, you only need to define your task in the KubernetesPodOperator itself.

The example dag contains 5 consecutive tasks:

  • Create a node group according to the user’s specifications (For the example that uses GPU resources).
  • Use a sensor to check that the cluster is running correctly.
  • Use the KubernetesPodOperator to run any valid Docker image in a Pod on the newly created node group on the remote cluster. The example dag uses the standard Ubuntu image to print “hello” to the console using a bash command.
  • Delete the node group.
  • Verify that the node group has been deleted.
1# import DAG object and utility packages
2from airflow import DAG
3from pendulum import datetime
4from airflow.configuration import conf
5
6# import the KubernetesPodOperator
7from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
8 KubernetesPodOperator,
9)
10
11# import EKS related packages from the Amazon Provider
12from airflow.providers.amazon.aws.hooks.eks import EksHook, NodegroupStates
13from airflow.providers.amazon.aws.operators.eks import (
14 EksCreateNodegroupOperator,
15 EksDeleteNodegroupOperator,
16)
17from airflow.providers.amazon.aws.sensors.eks import EksNodegroupStateSensor
18
19# custom class to create a node group with Nodes on EKS
20class EksCreateNodegroupWithNodesOperator(EksCreateNodegroupOperator):
21 def execute(self, context):
22 # instantiating an EKSHook on the basis of the AWS connection (Step 5)
23 eks_hook = EksHook(
24 aws_conn_id=self.aws_conn_id,
25 region_name=self.region,
26 )
27
28 # define the Node group to create
29 eks_hook.create_nodegroup(
30 clusterName=self.cluster_name,
31 nodegroupName=self.nodegroup_name,
32 subnets=self.nodegroup_subnets,
33 nodeRole=self.nodegroup_role_arn,
34 scalingConfig={"minSize": 1, "maxSize": 1, "desiredSize": 1},
35 diskSize=20,
36 instanceTypes=["g4dn.xlarge"],
37 amiType="AL2_x86_64_GPU", # get GPU resources
38 updateConfig={"maxUnavailable": 1},
39 )
40
41
42# instantiate the DAG
43with DAG(
44 start_date=datetime(2022, 6, 1),
45 catchup=False,
46 schedule="@daily",
47 dag_id="KPO_remote_EKS_cluster_example_dag",
48) as dag:
49 # task 1 creates the node group
50 create_gpu_nodegroup = EksCreateNodegroupWithNodesOperator(
51 task_id="create_gpu_nodegroup",
52 cluster_name="<your cluster name>",
53 nodegroup_name="gpu-nodes",
54 nodegroup_subnets=["<your subnet>", "<your subnet>"],
55 nodegroup_role_arn="<arn of your EKS role>",
56 aws_conn_id="<your aws conn id>",
57 region="<your region>",
58 )
59
60 # task 2 check for node group status, if it is up and running
61 check_nodegroup_status = EKSNodegroupStateSensor(
62 task_id="check_nodegroup_status",
63 cluster_name="<your cluster name>",
64 nodegroup_name="gpu-nodes",
65 mode="reschedule",
66 timeout=60 * 30,
67 exponential_backoff=True,
68 aws_conn_id="<your aws conn id>",
69 region="<your region>",
70 )
71
72 # task 3 the KubernetesPodOperator running a task
73 # here, cluster_context and the kubernetes_conn_id are defined at the task level.
74 run_on_EKS = KubernetesPodOperator(
75 task_id="run_on_EKS",
76 cluster_context="<arn of your cluster>",
77 namespace="airflow-kpo-default",
78 name="example_pod",
79 image="ubuntu",
80 cmds=["bash", "-cx"],
81 arguments=["echo hello"],
82 get_logs=True,
83 in_cluster=False,
84 kubernetes_conn_id="k8s",
85 startup_timeout_seconds=240,
86 )
87
88 # task 4 deleting the node group
89 delete_gpu_nodegroup = EksDeleteNodegroupOperator(
90 task_id="delete_gpu_nodegroup",
91 cluster_name="<your cluster name>",
92 nodegroup_name="gpu-nodes",
93 aws_conn_id="<your aws conn id>",
94 region="<your region>",
95 )
96
97 # task 5 checking that the node group was deleted successfully
98 check_nodegroup_termination = EksNodegroupStateSensor(
99 task_id="check_nodegroup_termination",
100 cluster_name="<your cluster name>",
101 nodegroup_name="gpu-nodes",
102 aws_conn_id="<your aws conn id>",
103 region="<your region>",
104 mode="reschedule",
105 timeout=60 * 30,
106 target_state=NodegroupStates.NONEXISTENT,
107 )
108
109 # setting the dependencies
110 create_gpu_nodegroup >> check_nodegroup_status >> run_on_EKS
111 run_on_EKS >> delete_gpu_nodegroup >> check_nodegroup_termination