Clean up and delete task metadata from Airflow DB
You can run a cleanup job to automatically delete task and DAG metadata from your Deployment. This job runs an Astronomer custom cleanup script for all of your Deployments and exports the results for each Deployment in a CSV formatted file structure to your configured external storage service.
The Houston API graphql query cleanupAirflowDb
triggers the Airflow metadata cleanup job. You can enable this query by setting the config flag astronomer.houston.cleanupAirflowDb.enabled
to true
. Then, you can use this endpoint to run your cleanup job as a regularly scheduled job using a Kubernetes cronjob. If you want to run a cleanup job for immediate execution, you can manually trigger the cleanup job with the Houston API cleanupAirflowDb
query.
The cleanup job deletes any data that's older than the number of days specified in your olderThan
configuration. Ensure that none of your historical data is required to run current DAGs or tasks before enabling this feature.
Prerequisites
- System admin user privileges
- External storage credentials that allow read/write permissions to your storage
Step 1: Configure your external storage credentials
Using Google Cloud Storage
-
You must provision a GCP Service Account with appropriate read/write permissions to your bucket. Export these credentials as a JSON file.
-
Create a Kubernetes secret in your Astronomer platform namespace with a name such as
astronomer-gcs-keyfile
. Then, run the following commands to update your environment:
kubectl annotate secret astronomer-gcs-keyfile "astronomer.io/commander-sync"="platform=astronomer"
kubectl run job --from=cronjob/astronomer-config-syncer runconfigsyncer-job-001
You use this Kubernetes secret to configure providerEnvSecretName
when you configure the cleanup job and env.name
when you set the storage provider secret.
(Optional) Configure a connection ID
If you want to run jobs for specific Deployments or within a Workspace or run manually triggered jobs using an API query, you can choose to configure an Airflow connection to your external storage service so that it can be stored as an environment variable. You must use the service account credentials to authenticate to your service when configuring your connection.
-
Provision a GCP Service Account with appropriate read/write permissions to your bucket. Export these credentials as a JSON file.
-
Create an Airflow connection using these credentials. See Airflow documentation to learn how to configure your connection.
This strategy is not secure because the secret is in base64 encoded format in your config.yaml
, which can be decoded.
You can use this connection as your connectionId
when you make API queries as the cleanupjob trigger, but it is not required.
Step 3: Configure the cleanup job
The cronjob configuration provides the default values that your cleanup job uses whether you run a scheduled or manual cleanup job.
The following example shows the automatic cleanup job configuration that runs at 5:23AM and cleans up Deployments that are more than one year old.
astronomer:
houston:
cleanupAirflowDb:
# Enable cleanup CronJob
enabled: true
# Default run is at 5:23 every morning https://crontab.guru/#23_5_*_*_*
schedule: "23 5 * * *"
# Cleanup deployments older than this many days
olderThan: 365
# Output path of archived data csv export
outputPath: "/tmp"
# Delete archived tables
dropArchives: true
# If set true, prints out the deployments that should be cleaned up and skip actual cleanup
dryRun: false
# Name of file storage provider, supported providers - gcp/azure/aws/local
provider: <gcp/azure/aws/local>
# Name of the provider bucket name / local file path
bucketName: "/tmp"
# The name of the Kubernetes Secret containing your cloud provider connection secret
providerEnvSecretName: "<your-secret-name>"
# Run cleanup on specific table or list of tables in a comma separated format
tables: "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom"
Step 3: Set the storage provider secret for your scheduler
In the Houston config section of your values.yaml
file, set the storage provider secret that you configured in Step 1, so that the cleanup job can export your cleanup results to your cloud storage.
The env.name
value includes the must match the secret name that you configured for providerEnvSecretName
in your values.yaml
file.
astronomer:
houston:
deployments:
helm:
airflow:
scheduler:
extraVolumes:
- name: dbcleanup
secret:
defaultMode: 420
optional: true
secretName: astronomer-gcs-keyfile
extraVolumeMounts:
- mountPath: /tmp/creds/astronomer-gcs-keyfile
name: dbcleanup
subPath: astronomer-gcs-keyfile
env:
- name: GCP_PASS
value: /tmp/creds/astronomer-gcs-keyfile
Step 4: Apply your configuration
Apply your platform configuration changes to enable cleanup jobs and to set your cronjob schedule.
helm upgrade <your-platform-release-name> astronomer/astronomer -f <your-updated-config-yaml-file> -n <your-platform-namespace> --set astronomer.houston.upgradeDeployments.enabled=false
If you want to upgrade all Deployments while updating your configuration, you can set astronomer.houston.upgradeDeployments.enabled
to true
.
Step 5: (Optional) Manually trigger the cleanup job
The following configuration enables you to trigger a cleanup job manually using a Houston API query. When you use the cleanup job in this way, the values you include in the query are used instead of the defaults set in the values.yaml
configuration. This means you must specify the Deployment or Workspace in your query that you want to clean up.
astronomer:
config:
deployments:
cleanupAirflowDb:
enabled: true
In Step 3, you set an automatic schedule for your platform to clean up task metadata by setting the astronomer.houston.cleanupAirflowDb.enabled
configuration to true
. To enable only triggering cleanup jobs manually, you must instead set astronomer.houston.cleanupAirflowDb.enabled
to false
. Manually triggered cleanup jobs require you to use a Houston API query and specify the Deployments where you want to archive metadata.
The following examples shows different mutations that you can use depending on your needs. See Houston API examples for all examples and scenarios that you can use to work with the Houston API.
Houston API Parameters
Name | Type | Description |
---|---|---|
olderThan | Int | Clean up data in Deployments that are older than the number of days defined in this parameter. |
dryRun | Bool | When set to true , the job does not make changes, it only logs which data would be cleaned up. If unspecified, default is false . |
outputPath | String | The path in your storage bucket or local storage where the job saves the archived CSV data. |
dropArchives | Bool | If true , deletes any previously archived tables after export. Use with caution. Set to false by default. |
provider | String | The cloud provider you use for archiving. Supported values: aws , azure , gcp , local . If unspecified, defaults to local . |
bucketName | String | Name of the cloud storage bucket or local directory where the job saves the archive CSV export. |
providerEnvSecretName | String | Name of the Kubernetes Secret that contains the credentials or config for the storage provider, if you used a Kubernetes secret. |
deploymentIds | String | List of the specific Deployment IDs to target for cleanup. |
workspaceId | String | Restricts cleanup to Deployments within the configured Workspace. |
tables | String | Comma-separated list of tables to target for cleanup. If you do not configure this parameter, all supported tables will be cleaned. |
connectionId | String | (Optional configuration) Airflow connection ID used for accessing the underlying data warehouse. Can be left empty if no connection is defined. |
Set dryRun: true
to test this feature without deleting any data. When dry runs are enabled, the cleanup job will only print the data that it plans to modify in the serial output of the webserver Pod. To view the dryRun events of the cleanup job, check the logs of your webserver Pod for each Deployment.
Scenario 1: Cleanup Deployments per Workspace
You can use the following query to clean up Deployments in a specific Workspace. Configure the workspaceId
parameter with the Workspace whose Deployments you want to clean up. You can also find Workspace IDs with the sysWorkspaces
Houston API query.
query cleanupAirflowDb(
$olderThan: Int!
$dryRun: Boolean!
$outputPath: String!
$dropArchives: Boolean!
$provider: String!
$bucketName: String!
$providerEnvSecretName: String!
$deploymentIds: [Id]
$workspaceId: Uuid
$tables: String!
$connectionId: String
) {
cleanupAirflowDb(
olderThan: $olderThan
dryRun: $dryRun
outputPath: $outputPath
dropArchives: $dropArchives
provider: $provider
bucketName: $bucketName
providerEnvSecretName: $providerEnvSecretName
workspaceId: $workspaceId
tables: $tables
connectionId: $connectionId
)
}
The following example shows some configured query variables to clean up all Deployments older than 1 day within the Workspace, cma40n66l000008l89nye86o1
.
{
"olderThan": 1,
"dryRun": true,
"outputPath": "",
"dropArchives": true,
"provider": "gcp",
"bucketName" : "",
"connectionId": "",
"tables": "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom",
"providerEnvSecretName": "GCP_PASS",
"workspaceId": "cma40n66l000008l89nye86o1"
}
Scenario 2: Cleanup Deployments across your system
You can use the following query to clean up specific Deployments in a specific workspace. Configure the workspaceId
parameter with the Workspace and the deploymentIds
with the specific Deployments.. You can also find Workspace IDs with the sysWorkspaces
Houston API query.
query cleanupAirflowDb(
$olderThan: Int!
$dryRun: Boolean!
$outputPath: String!
$dropArchives: Boolean!
$provider: String!
$bucketName: String!
$providerEnvSecretName: String!
$deploymentIds: [Id]
$workspaceId: Uuid
$tables: String!
$connectionId: String
) {
cleanupAirflowDb(
olderThan: $olderThan
dryRun: $dryRun
outputPath: $outputPath
dropArchives: $dropArchives
provider: $provider
bucketName: $bucketName
providerEnvSecretName: $providerEnvSecretName
deploymentIds: [Id]
tables: $tables
connectionId: $connectionId
)
}
The following example shows some configured query variables to clean up the specified Deployments older than 1 day within the Workspace, cma42z570000008l8f6rpc72f
.
{
"olderThan": 1,
"dryRun": true,
"outputPath": "",
"dropArchives": true,
"provider": "gcp",
"bucketName" : "",
"connectionId": "",
"tables": "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom",
"providerEnvSecretName": "GCP_PASS",
"deploymentIds": ["cma42zc67000108l89eb37iy5","cma42zjdp000208l8g16ygm6m"]
"workspaceId": "cma42z570000008l8f6rpc72f"
}
Scenario 3: Clean up Deployment using Airflow connection ID
Requires configuring an Airflow Connection ID, connectionId
, from the Airflow UI or CLI.
You can also both clean up all Deployments in a Workspace or specific Deployments and export the clean up logs to a storage provider configured in an Airflow Connection.
query cleanupAirflowDb(
$olderThan: Int!
$dryRun: Boolean!
$outputPath: String!
$dropArchives: Boolean!
$provider: String!
$bucketName: String!
$providerEnvSecretName: String!
$deploymentIds: [Id]
$workspaceId: Uuid
$tables: String!
$connectionId: String
) {
cleanupAirflowDb(
olderThan: $olderThan
dryRun: $dryRun
outputPath: $outputPath
dropArchives: $dropArchives
provider: $provider
bucketName: $bucketName
deploymentIds: [Id]
tables: $tables
connectionId: $connectionId
)
}
The following code example cleans up the specified Deployments that are more than 1 day old and exports the logs to the storage provider defined in the connectionId
.
{
"olderThan": 1,
"dryRun": true,
"outputPath": "",
"dropArchives": true,
"provider": "gcp",
"bucketName" : "",
"connectionId": "<airflow_connection_id>",
"tables": "callback_request,celery_taskmeta,celery_tasksetmeta,dag,dag_run,dataset_event,import_error,job,log,session,sla_miss,task_fail,task_instance,task_reschedule,trigger,xcom",
"deploymentIds": ["cm6q3jpn61741517mhonzgcgz7","cm6q3jpn61741517mhonzgcgz7"]
"workspaceId": "cm5nj9wly007617iox80beute"
}
Access your cleanup logs
You can access your cleanup logs through the UI or with your Pod logs.
Pod Logs
You can access your Pod logs with vector sidecar logging or FluentD with <release-name>-meta-cleanup-job
in the Airflow namespace.
UI access
Go to the Logs tab in your Deployments page and select the AirflowMetaCleanup tab to access the logs. Only supported for FluentD at this time.