Useful SQL queries for Apache Airflow
Get total completed task count
SELECT COUNT(1) FROM task_instance WHERE state IS NOT NULL AND state NOT IN ('scheduled', 'queued');
Get tasks started per hour for past week
SELECT date_trunc('hour', start_date) AS d, count(1) FROM task_instance GROUP BY d ORDER BY 1 DESC LIMIT 24*7;
Get tasks finished per hour for past week
SELECT date_trunc('hour', end_date) AS d, count(1) FROM task_instance WHERE state IN ('skipped', 'success', 'failed') AND end_date IS NOT NULL GROUP BY d ORDER BY 1 DESC LIMIT 24*7;
Unpause a list of paused DAGs
UPDATE dag SET is_paused = FALSE WHERE is_paused is TRUE AND dag_id in ( 'clickstream_v2_to_redshift__xxx', 'clickstream_v2_to_redshift__yyy', 'clickstream_v2_to_redshift__zzz', );
Pause all active DAGs and unpause with a temp table
We use this to be able to limit the impact of prod rollouts by only affecting one or two Astronomer DAGs before all customers.
dag_tmp to something unique and make sure it doesn't exist first.
SELECT dag_id INTO dag_tmp FROM dag WHERE is_paused IS FALSE; UPDATE dag SET is_paused = TRUE FROM dag_tmp WHERE dag.dag_id = dag_tmp.dag_id; UPDATE dag SET is_paused = FALSE FROM dag_tmp WHERE dag.dag_id = dag_tmp.dag_id; DROP TABLE dag_tmp;
Delete a DAG completely
Deleting the DAG file itself leaves traces across 7 database tables, such as those for DAG runs and task instances.
Sometimes we need to completely blow out these rows for a certain DAG to re-run it from scratch, rewind the start date forward or backward, etc.
In the next release of Airflow after 1.9, a delete_dags command will be included in the CLI and REST API. For Airflow versions through 1.9, we have this.
delete from xcom where dag_id = 'my_dag_id'; delete from task_instance where dag_id = 'my_dag_id'; delete from sla_miss where dag_id = 'my_dag_id'; delete from log where dag_id = 'my_dag_id'; delete from job where dag_id = 'my_dag_id'; delete from dag_run where dag_id = 'my_dag_id'; delete from dag where dag_id = 'my_dag_id';
Rewinding a DAG
To rewind a DAG:
- Turn the DAG off in Airflow.
- Blow out the Airflow metadata for that DAG.
- The DAG will be automatically recreated and started from the new config.
If you blow out the metadata before the cache has updated, it will re-create the DAG with the old data.
Fast Forwarding a DAG
You can fast forward a DAG by generating fake DAG runs in the Airflow metadata database.
First determine the timestamp of the latest DAG run:
-- NOTE: PAUSE THE DAG FIRST -- change to your desired dag_id select max(execution_date) from dag_run where dag_id = 'clickstream_v2_to_redshift__59ca877951ad6e2f93f870c5';
Take the timestamp output from the first query and add 1 hour (the output above was 5:15 AM, so 6:15 AM is used below), then put the new value where both of the timestamps are in the second query:
insert into dag_run(dag_id, execution_date, run_id, state) values ( 'clickstream_v2_to_redshift__59ca877951ad6e2f93f870c5', '2018-04-27 06:15:00.000000', 'scheduled__2018-04-27T06:15:00__fake', 'failed' );
If you want to go all the way up until (exclusive) 5/9/18 00:00 UTC, then the last fake DAG run to create is '2018-05-08 23:15:00.000000'.