Airflow Edge Cases on Astro
Apache Airflow gives huge flexibility in how it can be used. While this is a benefit of Airflow, it also means that there are a few edge case behaviors that are somewhat unintuitive.
Workers Scaling Down Frequently
Astro guarantees that each task will have at least 24 hours to complete before being disrupted by a scaling event. This means that when a worker is scaled down, it can’t actually be removed until the last task on it completes or 24 hours have passed since it was marked for scale down. And if the last task to complete runs for a considerably longer time than the other tasks on the worker, a single task could be on the worker for a long time, leading to rather inefficient use of resources. This problem is most pronounced if there is frequent scaling up and down. These workers that are waiting on a task to complete before shutting down are called “terminating” and are not counted against the max number of workers for the queue. This leads to the possibility that more workers can be running on a worker queue than its max number of workers, leading to increased cost.
Walking through a worst case scenario, imagine that every hour, 100 tasks are started on a worker queue with a concurrency of 10. Out of the 100 tasks, 5 of them take 3 hours to complete, while the rest finish within 15 minutes. At the top of the hour, 10 workers are spun up to perform the work. After 15 minutes, all but 5 of the tasks are completed, so Astro determines that with only 5 running tasks, only 1 worker needs to remain. However, 4 other workers will also be kept in a terminating state while they complete their long running tasks. Once 1 hour passes, 10 new workers will be up to complete the new tasks, in addition to the 4 terminating workers. At 1 hour and 15 minutes, the pattern will repeat and now there are 9 terminating workers. At 2 hours elapsed, the total number of workers in the worker queue will be 19, nearly double what might normally be expected from this worker queue.
The solution is to avoid putting tasks with wildly different execution times on the same worker queue. Long running tasks should run on a separate worker queue so that they are scaled up and down less frequently, and when they are scaled down, there are multiple tasks with similar durations leading to efficient use of the terminating worker.
More than 1000 Deferred Tasks
With default settings, an Astro deployment will only evaluate the condition of 1000 deferred tasks, with other deferred tasks waiting unchecked until one of the 1000 has its condition met, opening up space. There is no indication in Airflow as to which deferred tasks are actually being checked. If you have CPU and memory headroom on the triggerer per your Analytics page, you can increase the default_capacity in Airflow 2 or capacity in Airflow 3. If you need to run more than 1000 deferred tasks at a time and do not have the headroom in CPU or memory to increase capacity, contact Astronomer Support.
Impact of Long Dag Parse Times
You may be familiar with the best practice of avoiding top level code in dags, as it leads to a long dag parse time. What may be less clear is what the impact of an increased dag parse time is. First, the more dags you have that take longer to parse, the slower changes in those dags will show up in Airflow’s UI and scheduling behavior. All the dags are parsed in a loop continuously, and the longer it takes to get through that loop, the longer it takes for changes to take effect.
Additionally, the worker needs to parse the dag file before it can actually run the task. While the dag file is being parsed, the task will be in the queued state. Thus, a dag that takes a long time to parse will see long queued times for all of its tasks.
Logging and Out of Memory on Tasks
There are two ways that task logs are retrieved. While a task is running, its logs are read directly from the worker via a small webserver that the worker runs for this purpose. This enables “live logging”. But after the task is completed, it takes the logs from the worker and uploads them to cloud storage. However, if the worker is killed with an out of memory (OOM) error, the worker is killed before the logs from its currently running tasks can be uploaded, and of course the worker is no longer around to serve live logs. Thus, worker out of memory errors lead to there being no logs for a task. After the worker OOM, the tasks will appear to be Running for another 5 minutes1 at which point the scheduler will notice that there has been no communication from the task in the form of periodic heartbeats for too long. It will mark the task as failed and state that it was killed because of no task heartbeat in the task’s Event Log. Thus, if you see a task with no logs, but a heartbeat missing event in its Event Log, it’s almost certainly an Out of Memory error.
Footnotes
-
Unless configured for a different amount of time ↩