Managing Dependencies in Apache Airflow

Managing Dependencies

The logic between tasks

Each DAG object contains a set of tasks that are related by dependencies - the "directed" in directed acyclic graph.

Dependencies are one of Airflow's most powerful and popular features - they allow for previously long, brittle jobs to be broken down into granular parts that are safer, more modular, and reusable.

Simple Dependencies

Dependencies can be set syntactically or through bitshift operators.

title

This logic can be set three ways:

  • d1.set_downstream(d2)

    d2.set_downstream(d3)

    d3.set_downstream(d4)


  • d4.set_upstream(d3)

    d3.set_upstream(d2)

    d2.set_upstream(d1)


  • d1 >> d2 >> d3 >> d4


  • d4 << d3 << d2 << d1

All three are in line with best practice as long as it is written consistently - do not do:

  • d1.set_downstream(d2)

    d2 >>d3

    d4.set_upstream(d3)

Dynamically Setting Dependencies

For a large number of tasks, dependencies can be set in loops:

title

Recall that tasks are identified by their task_id and associated DAG object - not by the type of operator. Consider this:

with dag:

    final_task = DummyOperator(task_id='final')

    for i in range(0, 3):
        d1 = DummyOperator(task_id='task_{0}'.format(i))
        for j in range(0, 3):
            d2 = PythonOperator(task_id='task_{0}'.format(i),
                                python_callable=test_callable,
                                provide_context=True)

            d1 >> d2 >> final_task

What happens here?

Trigger Rules

Complex Dependencies

By default, workflows are triggered when upstream tasks have succeeded. However, more complex trigger rules can be implemented.

Operators have a trigger_rule that defines how the task gets triggered. The default all_success rule dictates that the task should be triggered when all upstream dependent tasks have reached the success state.

Each trigger rule can have specific use cases:



all_success: (default) all parents have succeeded

all_failed: all parents are in a failed or upstream_failed state

all_done: all parents are done with their execution

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done

dummy: dependencies are just for show, trigger at will

TriggerRules are defined as Airlfow Utils:

Different Use Cases

Extendability vs Safety

The one_failed rule

When you have a critically important but brittle task in a workflow (i.e. a large machine learning job, some reporting task, etc.), a good safety check would be adding a task that handles the failure logic. This logic can be implemented dynamically based on how the DAG is being generated.

# Define the tasks that are "brittle."
# Generally advisable when working with data drops from vendors/FTPs

job_info = [
    {
        'job_name': 'train_model',
        'brittle': True
    },
    {
        'job_name': 'execute_query',
        'brittle': False

    }]


# Define some failure handling

def fail_logic(**kwargs):
    # Implement fail_logic here.
    return


with dag:
    for job in job_info:
        d1 = DummyOperator(task_id=job['job_name'])
        
        # Generate a task based on a condition
        
        if job['brittle']:
            d2 = PythonOperator(task_id='{0}_{1}'.format(job['job_name'],
                                                         'fail_logic',),
                                python_callable=fail_logic,
                                provide_context=True,
                                trigger_rule=TriggerRule.ONE_FAILED)
            d1 >> d2
        for i in range(0, 5):
            downstream = DummyOperator(
                task_id='{0}_{1}'.format(job['job_name'], i))

            d1 >> downstream

one_failed

Note: Similar logic can be implemented by specifying an on_failure_callback if using a PythonOperator. The trigger_rule is better used when triggering a custom operator.

Though trigger rules can be convenient, they can also be unsafe and the same logic can usually be implemented using safer features.

A common use case of exotic trigger rules is a task downstream of all other tasks that kicks off the necessary logic.

The one_success rule

This rule is particulary helpful when setting up a "safety check" DAG - a DAG that runs as a safetycheck to all your data. If one of the "disaster checks" come back as True, the downstream disaster task can run the necessary logic.

Note: The same logic can be implemented with the one_failed rule.

The all_failed rule

all_failed tells a task to run when all upstream tasks have failed and can be used to execute a fail condition for a workflow.

The workflow may look something like this: all_failed

Note: The final task was set to skipped

Once again, the same functionality can be achieved by using the PythonBranchOperator, a TriggerDagOperator, or just configuring reporting in moore specific way.

Triggers with LatestOnlyOperator

When scheduling tasks with complex trigger rules with dates in the past, there may be instances where certain tasks can run independely of time and others shouldn't.

The parameters can also be set in the DAG configuration as above - the scheduling may get a bit messy, but it can save computing resources and add a layer of safety.

job_info = [
    {
        'job_name': 'train_model',
        'brittle': True,
        'latest_only': True
    },
    {
        'job_name': 'execute_query',
        'brittle': False,
        'latest_only': False

    }]

with dag:
    start = DummyOperator(task_id='kick_off_dag')
    for job in job_info:
        d1 = DummyOperator(task_id=job['job_name'])

        # Generate a task based on a condition

        if job['brittle']:
            d2 = PythonOperator(task_id='{0}_{1}'.format(job['job_name'],
                                                         'fail_logic',),
                                python_callable=fail_logic,
                                provide_context=True,
                                trigger_rule=TriggerRule.ONE_FAILED)
            d1 >> d2
        start >> d1

        if job['latest_only']:
            latest_only = LatestOnlyOperator(task_id='latest_only_{0}'
                                             .format(job['job_name']))
            d1 >> latest_only

        for i in range(0, 5):
            downstream = DummyOperator(
                task_id='{0}_{1}'.format(job['job_name'], i))
            if job['latest_only']:
                latest_only >> downstream
            else:
                d1 >> downstream

skipped


Computing resources would be saved on past DAG runs.

tree The latest run would execute the necessary downstream logic.


Subscribe to RSS
Ready to build your data pipelines?

Astronomer is the data engineering platform built by developers for developers. Send data anywhere with automated Apache Airflow workflows, built in minutes...