How everything fits together
At the core, Airflow consists of 4 core components:
Webserver: Airflow's UI. At it's core, this is just a Flask app that displays the status of your jobs and provides an interface to interact with the database and reads logs from a remote file store (S3, Google Cloud Storage, AzureBlobs, ElasticSearch etc.).
Scheduler: This is responsible for scheduling jobs. It is a multithreaded Python process that uses the DAG object with the state of tasks in the metadata database to decide what tasks need to be run, when they need to be run, and where they are run.
Executor: The mechanism by which work actually gets done. There are a few different varieties of executors, each wtih their own strengths and weaknesses.
Metadata Database: A database (usually Postgres, but can be anything with SQLAlchemy support) that powers how the other components interact. The scheduler stores and updates task statuses, which the webserver then uses to display job information
How does work get scheduled?
Once the scheduler is started:
1) The scheduler "taps" the dags folder and instanstiates all DAG objects in the metadata databases. Depending on the configuration, each DAG gets a configurable number of processes.
Note: This means all top level code (ie. anything that isn't defining the DAG) in a DAG file will get run each scheduler heartbeat. Try to avoid top level code to your DAG file unless absolutely necessary.
2) Each process parses the DAG file and creates the necessary DagRuns based on the scheduling parameters of each DAG's tasks. A TaskInstance is instanstiated for each task that needs to be executed. These TaskInstances are set to
Scheduled in the metadata database.
3) The primary scheduler process queries the database for all tasks in the
SCHEDULED state and sends them to the executors to be executed (with state changed to
4) Depending on the execution setup, workers will pull tasks from the queue and start executing it. Tasks that are pulled off of the queue are changed from "queued" to "running."
5) If a task finishes, the worker then changes the status of that task to it's final state (finished, failed, etc.). The scheduler then reflects this change in the metadata database.
# https://github.com/apache/incubator-airflow/blob/2d50ba43366f646e9391a981083623caa12e8967/airflow/jobs.py#L1386 def _process_dags(self, dagbag, dags, tis_out): """ Iterates over the dags and processes them. Processing includes: 1. Create appropriate DagRun(s) in the DB. 2. Create appropriate TaskInstance(s) in the DB. 3. Send emails for tasks that have missed SLAs. :param dagbag: a collection of DAGs to process :type dagbag: models.DagBag :param dags: the DAGs from the DagBag to process :type dags: DAG :param tis_out: A queue to add generated TaskInstance objects :type tis_out: multiprocessing.Queue[TaskInstance] :return: None """ for dag in dags: dag = dagbag.get_dag(dag.dag_id) if dag.is_paused: self.log.info("Not processing DAG %s since it's paused", dag.dag_id) continue if not dag: self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id) continue self.log.info("Processing %s", dag.dag_id) dag_run = self.create_dag_run(dag) if dag_run: self.log.info("Created %s", dag_run) self._process_task_instances(dag, tis_out) self.manage_slas(dag) models.DagStat.update([d.dag_id for d in dags])
Controlling Component Interactions
Fine tuning airflow.cfg
The schedule at which these components interact can be set through airflow.cfg. This file has tuning for several airflow settings that can be optimized for a use case.
This file is well documented, but a few notes:
By default, Airflow can use the LocalExecutor, SequentialExecutor, or the CelelryExecutor.
The SequentialExecutor just executes tasks sequentially, with no parallelism or concurrency. It is good for a test environment or when debugging deeper Airflow bugs.
The LocalExecutor supports parallelism and hyperthreading and is a good fit for Airflow running on local machine or a single node.
The CeleryExecutor is the preferred method to run a distrubted Airflow cluster. It requires Redis, RabbitMq, or another message queue system to coordinate tasks between workers.
There is a communinty contributed MesosExecutors and KubernetesExexcutor that can execute tasks across larger clusters, but neither of them are currently production ready.
max_active_runs_per_dag settings can be tweaked to determine how many tasks can be executed at once.
It is important to note that
parallelism determines how many task instances can run in parallel in the executor, while
dag_concurrency determines the number of tasks that can be scheduled by the scheduler. These two numbers should be fine tuned together when optimizing an Airflow deployment, with the ratio depending on the number of DAGs.
max_active_runs_per_dag is for each particular DAG - how many DagRuns across time can be scheduled for a single DAG at once. This number should depend on how how long DAGs take to execute, their schedule interval, and scheduler performance.
job_heartbeat_sec determines the frequency at which the scheduler listens for external kill signals, while
scheduler_heartbeat_sec looks for new tasks.
As the cluster grows in size, increasing the
scheduler_heartbeat_sec gets increasingly expensive. Depending on the infrastructure and how long tasks generally take, and how the scheduler performs, consider increasing this number from the default.