Skip to main content
Version: 0.37

Clean up and delete task metadata from Airflow DB

You can run a cleanup job to automatically delete task and DAG metadata from your Deployment. This job runs an Astronomer custom cleanup script for all of your Deployments and exports the results for each Deployment in a CSV formatted file structure to your configured external storage service.

The Houston API graphql query cleanupAirflowDb triggers the Airflow metadata cleanup job. You can enable this query by setting the config flag astronomer.houston.cleanupAirflowDb.enabled to true. Then, you can use this endpoint to run your cleanup job as a regularly scheduled job using a Kubernetes cronjob. If you want to run a cleanup job for immediate execution, you can manually trigger the cleanup job with the Houston API cleanupAirflowDb query.

danger

The cleanup job deletes any data that's older than the number of days specified in your olderThan configuration. Ensure that none of your historical data is required to run current DAGs or tasks before enabling this feature.

Prerequisites

  • System admin user privileges
  • External storage credentials that allow read/write permissions to your storage

Step 1: Configure your external storage credentials

Using Google Cloud Storage

  1. You must provision a GCP Service Account with appropriate read/write permissions to your bucket. Export these credentials as a JSON file.

  2. Create a Kubernetes secret in your Astronomer platform namespace with a name such as astronomer-gcs-keyfile. Then, run the following commands to update your environment:

kubectl annotate secret astronomer-gcs-keyfile "astronomer.io/commander-sync"="platform=astronomer"

kubectl run job --from=cronjob/astronomer-config-syncer runconfigsyncer-job-001

You use this Kubernetes secret to configure providerEnvSecretName when you configure the cleanup job and env.name when you set the storage provider secret.

(Optional) Configure a connection ID

If you want to run jobs for specific Deployments or within a Workspace or run manually triggered jobs using an API query, you can choose to configure an Airflow connection to your external storage service so that it can be stored as an environment variable. You must use the service account credentials to authenticate to your service when configuring your connection.

  1. Provision a GCP Service Account with appropriate read/write permissions to your bucket. Export these credentials as a JSON file.

  2. Create an Airflow connection using these credentials. See Airflow documentation to learn how to configure your connection.

caution

This strategy is not secure because the secret is in base64 encoded format in your config.yaml, which can be decoded.

You can use this connection as your connectionId when you make API queries as the cleanupjob trigger, but it is not required.

Step 3: Configure the cleanup job

The cronjob configuration provides the default values that your cleanup job uses whether you run a scheduled or manual cleanup job.

The following example shows the automatic cleanup job configuration that runs at 5:23AM and cleans up Deployments that are more than one year old.

astronomer:
houston:
cleanupAirflowDb:
# Enable cleanup CronJob
enabled: true

# Default run is at 5:23 every morning https://crontab.guru/#23_5_*_*_*
schedule: "23 5 * * *"

# Cleanup deployments older than this many days
olderThan: 365

# Output path of archived data csv export
outputPath: "/tmp"

# Delete archived tables
dropArchives: true

# If set true, prints out the deployments that should be cleaned up and skip actual cleanup
dryRun: false

# Name of file storage provider, supported providers - gcp/azure/aws/local
provider: <gcp/azure/aws/local>

# Name of the provider bucket name / local file path
bucketName: "/tmp"

# The name of the Kubernetes Secret containing your cloud provider connection secret
providerEnvSecretName: "<your-secret-name>"

# Run cleanup on specific table or list of tables in a comma separated format
tables: "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom"

Step 3: Set the storage provider secret for your scheduler

In the Houston config section of your values.yaml file, set the storage provider secret that you configured in Step 1, so that the cleanup job can export your cleanup results to your cloud storage.

The env.name value includes the must match the secret name that you configured for providerEnvSecretName in your values.yaml file.

astronomer:
houston:
deployments:
helm:
airflow:
scheduler:
extraVolumes:
- name: dbcleanup
secret:
defaultMode: 420
optional: true
secretName: astronomer-gcs-keyfile
extraVolumeMounts:
- mountPath: /tmp/creds/astronomer-gcs-keyfile
name: dbcleanup
subPath: astronomer-gcs-keyfile
env:
- name: GCP_PASS
value: /tmp/creds/astronomer-gcs-keyfile

Step 4: Apply your configuration

Apply your platform configuration changes to enable cleanup jobs and to set your cronjob schedule.


helm upgrade <your-platform-release-name> astronomer/astronomer -f <your-updated-config-yaml-file> -n <your-platform-namespace> --set astronomer.houston.upgradeDeployments.enabled=false

tip

If you want to upgrade all Deployments while updating your configuration, you can set astronomer.houston.upgradeDeployments.enabled to true.

Step 5: (Optional) Manually trigger the cleanup job

The following configuration enables you to trigger a cleanup job manually using a Houston API query. When you use the cleanup job in this way, the values you include in the query are used instead of the defaults set in the values.yaml configuration. This means you must specify the Deployment or Workspace in your query that you want to clean up.

astronomer:
config:
deployments:
cleanupAirflowDb:
enabled: true
Restrict cleanup to manual-only triggers

In Step 3, you set an automatic schedule for your platform to clean up task metadata by setting the astronomer.houston.cleanupAirflowDb.enabled configuration to true. To enable only triggering cleanup jobs manually, you must instead set astronomer.houston.cleanupAirflowDb.enabled to false. Manually triggered cleanup jobs require you to use a Houston API query and specify the Deployments where you want to archive metadata.

The following examples shows different mutations that you can use depending on your needs. See Houston API examples for all examples and scenarios that you can use to work with the Houston API.

Houston API Parameters

NameTypeDescription
olderThanIntClean up data in Deployments that are older than the number of days defined in this parameter.
dryRunBoolWhen set to true, the job does not make changes, it only logs which data would be cleaned up. If unspecified, default is false.
outputPathStringThe path in your storage bucket or local storage where the job saves the archived CSV data.
dropArchivesBoolIf true, deletes any previously archived tables after export. Use with caution. Set to false by default.
providerStringThe cloud provider you use for archiving. Supported values: aws, azure, gcp, local. If unspecified, defaults to local.
bucketNameStringName of the cloud storage bucket or local directory where the job saves the archive CSV export.
providerEnvSecretNameStringName of the Kubernetes Secret that contains the credentials or config for the storage provider, if you used a Kubernetes secret.
deploymentIdsStringList of the specific Deployment IDs to target for cleanup.
workspaceIdStringRestricts cleanup to Deployments within the configured Workspace.
tablesStringComma-separated list of tables to target for cleanup. If you do not configure this parameter, all supported tables will be cleaned.
connectionIdString(Optional configuration) Airflow connection ID used for accessing the underlying data warehouse. Can be left empty if no connection is defined.
tip

Set dryRun: true to test this feature without deleting any data. When dry runs are enabled, the cleanup job will only print the data that it plans to modify in the serial output of the webserver Pod. To view the dryRun events of the cleanup job, check the logs of your webserver Pod for each Deployment.

Scenario 1: Cleanup Deployments per Workspace

You can use the following query to clean up Deployments in a specific Workspace. Configure the workspaceId parameter with the Workspace whose Deployments you want to clean up. You can also find Workspace IDs with the sysWorkspaces Houston API query.

query cleanupAirflowDb(
$olderThan: Int!
$dryRun: Boolean!
$outputPath: String!
$dropArchives: Boolean!
$provider: String!
$bucketName: String!
$providerEnvSecretName: String!
$deploymentIds: [Id]
$workspaceId: Uuid
$tables: String!
$connectionId: String
) {
cleanupAirflowDb(
olderThan: $olderThan
dryRun: $dryRun
outputPath: $outputPath
dropArchives: $dropArchives
provider: $provider
bucketName: $bucketName
providerEnvSecretName: $providerEnvSecretName
workspaceId: $workspaceId
tables: $tables
connectionId: $connectionId
)
}

The following example shows some configured query variables to clean up all Deployments older than 1 day within the Workspace, cma40n66l000008l89nye86o1.

{
"olderThan": 1,
"dryRun": true,
"outputPath": "",
"dropArchives": true,
"provider": "gcp",
"bucketName" : "",
"connectionId": "",
"tables": "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom",
"providerEnvSecretName": "GCP_PASS",
"workspaceId": "cma40n66l000008l89nye86o1"
}

Scenario 2: Cleanup Deployments across your system

You can use the following query to clean up specific Deployments in a specific workspace. Configure the workspaceId parameter with the Workspace and the deploymentIds with the specific Deployments.. You can also find Workspace IDs with the sysWorkspaces Houston API query.

query cleanupAirflowDb(
$olderThan: Int!
$dryRun: Boolean!
$outputPath: String!
$dropArchives: Boolean!
$provider: String!
$bucketName: String!
$providerEnvSecretName: String!
$deploymentIds: [Id]
$workspaceId: Uuid
$tables: String!
$connectionId: String
) {
cleanupAirflowDb(
olderThan: $olderThan
dryRun: $dryRun
outputPath: $outputPath
dropArchives: $dropArchives
provider: $provider
bucketName: $bucketName
providerEnvSecretName: $providerEnvSecretName
deploymentIds: [Id]
tables: $tables
connectionId: $connectionId
)
}

The following example shows some configured query variables to clean up the specified Deployments older than 1 day within the Workspace, cma42z570000008l8f6rpc72f.

{
"olderThan": 1,
"dryRun": true,
"outputPath": "",
"dropArchives": true,
"provider": "gcp",
"bucketName" : "",
"connectionId": "",
"tables": "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom",
"providerEnvSecretName": "GCP_PASS",
"deploymentIds": ["cma42zc67000108l89eb37iy5","cma42zjdp000208l8g16ygm6m"]
"workspaceId": "cma42z570000008l8f6rpc72f"
}

Scenario 3: Clean up Deployment using Airflow connection ID

caution

Requires configuring an Airflow Connection ID, connectionId, from the Airflow UI or CLI.

You can also both clean up all Deployments in a Workspace or specific Deployments and export the clean up logs to a storage provider configured in an Airflow Connection.

query cleanupAirflowDb(
$olderThan: Int!
$dryRun: Boolean!
$outputPath: String!
$dropArchives: Boolean!
$provider: String!
$bucketName: String!
$providerEnvSecretName: String!
$deploymentIds: [Id]
$workspaceId: Uuid
$tables: String!
$connectionId: String
) {
cleanupAirflowDb(
olderThan: $olderThan
dryRun: $dryRun
outputPath: $outputPath
dropArchives: $dropArchives
provider: $provider
bucketName: $bucketName
deploymentIds: [Id]
tables: $tables
connectionId: $connectionId
)
}

The following code example cleans up the specified Deployments that are more than 1 day old and exports the logs to the storage provider defined in the connectionId.

{
"olderThan": 1,
"dryRun": true,
"outputPath": "",
"dropArchives": true,
"provider": "gcp",
"bucketName" : "",
"connectionId": "<airflow_connection_id>",
"tables": "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom",
"deploymentIds": ["cm6q3jpn61741517mhonzgcgz7","cm6q3jpn61741517mhonzgcgz7"]
"workspaceId": "cm5nj9wly007617iox80beute"
}

Access your cleanup logs

You can access your cleanup logs through the UI or with your Pod logs.

Pod Logs

You can access your Pod logs with vector sidecar logging or FluentD with <release-name>-meta-cleanup-job in the Airflow namespace.

UI access

Go to the Logs tab in your Deployments page and select the AirflowMetaCleanup tab to access the logs. Only supported for FluentD at this time.

Was this page helpful?