Deploy dags from an AWS S3 bucket to Astro using AWS Lambda

Use the following CI/CD template to automate deploying Apache Airflow dags from an S3 bucket to Astro using AWS Lambda.

Prerequisites

Dag deploy template

This CI/CD template can be used to deploy dags from a single S3 bucket to a single Astro Deployment. When you create or modify a dag in the S3 bucket, a Lambda function triggers and initializes an astro project to deploy your dags using Astro CLI.

To deploy any non-dag code changes to Astro, you need to trigger a standard image deploy with your Astro project. When you do this, your Astro project must include the latest version of your dags from your S3 bucket. If your Astro project dags folder isn’t up to date with your S3 dags bucket when you trigger this deploy, you will revert your dags back to the version hosted in your Astro project.
  1. Download the latest Astro CLI binary from GitHub releases, then rename the file to, astro_cli.tar.gz. For example, to use Astro CLI version 1.13.0 in your template, download astro_1.13.0_linux_amd64.tar.gz and rename it to astro_cli.tar.gz.

  2. In your S3 bucket, create the following new folders:

    • dags
    • cli_binary
  3. Add astro_cli.tar.gz to cli_binary.

  4. In the AWS IAM console, create a new role for AWS Lambda with the following permissions. Replace <account_id>, <lambda_function_name>, and <bucket_name> with your values.

    1{
    2"Version": "2012-10-17",
    3"Statement": [
    4 {
    5 "Sid": "lambdacreateloggroup",
    6 "Effect": "Allow",
    7 "Action": "logs:CreateLogGroup",
    8 "Resource": "arn:aws:logs:us-east-1:<account_id>:*"
    9 },
    10 {
    11 "Sid": "lambdaputlogevents",
    12 "Effect": "Allow",
    13 "Action": [
    14 "logs:CreateLogStream",
    15 "logs:PutLogEvents"
    16 ],
    17 "Resource": [
    18 "arn:aws:logs:us-east-1:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
    19 ]
    20 },
    21 {
    22 "Sid": "bucketpermission",
    23 "Effect": "Allow",
    24 "Action": [
    25 "s3:GetObject",
    26 "s3:ListBucket"
    27 ],
    28 "Resource": [
    29 "arn:aws:s3::<bucket_name>",
    30 "arn:aws:s3::<bucket_name>/*"
    31 ]
    32 }
    33]
    34}
  5. Author a new AWS Lambda function from scratch with the following configurations:

    • Function name: Any
    • Runtime: Python 3.9
    • Architecture: Any
    • Execution role: Click Use an existing role and enter the role you created.
  6. Configure the following Lambda environment variables for your Lambda function:

    • ASTRO_HOME: \tmp
    • ASTRO_API_TOKEN: The value for your Workspace or Organization API token.
    • ASTRO_DEPLOYMENT_ID: Your Deployment ID.

    For production Deployments, Astronomer recommends storing your API credentials in AWS Secrets Manager and referencing them from Lambda. See https://docs.aws.amazon.com/lambda/latest/dg/configuration-database.html

  7. Add the following code to lambda_function.py. Replace <bucket_name> with your value.

    1import boto3
    2import subprocess
    3import os
    4import tarfile
    5
    6BUCKET = os.environ.get("BUCKET", "<bucket_name>")
    7s3 = boto3.resource('s3')
    8deploymentId = os.environ.get('ASTRO_DEPLOYMENT_ID')
    9
    10def untar(filename: str, destination: str) -> None:
    11 with tarfile.open(filename) as file:
    12 file.extractall(destination)
    13
    14def run_command(cmd: str) -> None:
    15 p = subprocess.Popen("set -x; " + cmd, shell=True)
    16 p.communicate()
    17
    18def download_to_local(bucket_name: str, s3_folder: str, local_dir: str = None) -> None:
    19 """
    20 Download the contents of a folder directory
    21 Args:
    22 bucket_name: the name of the s3 bucket
    23 s3_folder: the folder path in the s3 bucket
    24 local_dir: a relative or absolute directory path in the local file system
    25 """
    26 bucket = s3.Bucket(bucket_name)
    27 for obj in bucket.objects.filter(Prefix=s3_folder):
    28 target = obj.key if local_dir is None \
    29 else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
    30 if not os.path.exists(os.path.dirname(target)):
    31 os.makedirs(os.path.dirname(target))
    32 if obj.key[-1] == '/':
    33 continue
    34 bucket.download_file(obj.key, target)
    35 print("downloaded file")
    36
    37def lambda_handler(event, context) -> None:
    38 """Triggered by a change to a Cloud Storage bucket.
    39 :param event: Event payload.
    40 :param context: Metadata for the event.
    41 """
    42 base_dir = '/tmp/astro'
    43 if not os.path.isdir(base_dir):
    44
    45 os.mkdir(base_dir)
    46
    47 download_to_local(BUCKET, 'dags/', f'{base_dir}/dags')
    48 download_to_local(BUCKET, 'cli_binary/', base_dir)
    49
    50 os.chdir(base_dir)
    51 untar('./astro_cli.tar.gz', '.')
    52
    53 run_command('echo y | ./astro dev init')
    54 run_command(f"./astro deploy {deploymentId} --dags")
    55
    56 return {"statusCode": 200}
  8. Create a trigger for your Lambda function with the following configuration:

    • Source: Select S3.
    • Bucket: Select the bucket that contains your dags directory.
    • Event types: Select All object create events.
    • Prefix: Enter dags/.
    • Suffix: Enter .py.
  9. If you haven’t already, deploy your complete Astro project to your Deployment. See Deploy code.

If you stage multiple commits to dag files and push them all at once to your remote branch, the template only deploys dag code changes from the most recent commit. It will miss any code changes made in previous commits.

To avoid this, either push commits individually or configure your repository to Squash commits for pull requests that merge multiple commits simultaneously.

  1. Add your dags to the dags folder in your storage bucket
  2. In the Astro UI, select a Workspace, click Deployments, and then select your Deployment. Confirm that your Lambda function worked by checking the Deployment Dag bundle version. The version’s name should include the time that you added the dags to your S3 bucket.