Access the Apache Airflow context
The Airflow context is a dictionary containing information about a running DAG and its Airflow environment that can be accessed from a task. One of the most common values to retrieve from the Airflow context is the ti
/ task_instance
keyword, which allows you to access attributes and methods of the taskinstance
object.
Other common reasons to access the Airflow context are:
- You want to use DAG-level parameters in your Airflow tasks.
- You want to use the DAG run's logical date in an Airflow task, for example as part of a file name.
- You want to explicitly push and pull values to XCom with a custom key.
Use this document to learn about the data stored in the Airflow context and how to access it.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Basic Python. See the Python Documentation.
- Airflow operators. See Airflow operators.
Access the Airflow context
The Airflow context is available in all Airflow tasks. You can access information from the context using the following methods:
- Pass the
**context
argument to the function used in a@task
decorated task or PythonOperator. - Pass the
context
argument to a@asset
decorated function. See Assets and data-aware scheduling for more information. - Use Jinja templating in traditional Airflow operators.
- Access the context
kwarg
in the.execute
method of any traditional or custom operator.
You cannot access the Airflow context dictionary outside of an Airflow task.
Retrieve the Airflow context using the @task
decorator or PythonOperator
To access the Airflow context in a @task
decorated task or PythonOperator task, you need to add a **context
argument to your task function. This will make the context available as a dictionary in your task.
The following code snippets show how to print out the full context dictionary from a task:
- TaskFlow
- Traditional syntax
# from airflow.sdk import task
from pprint import pprint
@task
def print_context(**context):
pprint(context)
# from airflow.providers.standard.operators.python import PythonOperator
from pprint import pprint
def print_context_func(**context):
pprint(context)
print_context = PythonOperator(
task_id="print_context",
python_callable=print_context_func,
)
Retrieve the Airflow context using Jinja templating
Many elements of the Airflow context can be accessed by using Jinja templating. You can get the list of all parameters that allow templates for any operator by printing out its .template_fields
attribute.
For example, you can access a DAG run's logical date in the format YYYY-MM-DD
by using the template {{ ds }}
in the bash_command
parameter of the BashOperator.
# from airflow.providers.standard.operators.bash import BashOperator
print_logical_date = BashOperator(
task_id="print_logical_date",
bash_command="echo {{ ds }}",
)
It is also common to use Jinja templating to access XCom values in the parameter of a traditional task. In the code snippet below, the first task return_greeting
will push the string "Hello" to XCom, and the second task greet_friend
will use a Jinja template to pull that value from the ti
(task instance) object of the Airflow context and print Hello friend! :)
into the logs.
# from airflow.providers.standard.operators.bash import BashOperator
# from airflow.sdk import task
@task
def return_greeting():
return "Hello"
greet_friend = BashOperator(
task_id="greet_friend",
bash_command="echo '{{ ti.xcom_pull(task_ids='return_greeting') }} friend! :)'",
)
return_greeting() >> greet_friend
Find an up to date list of all available templates in the Airflow documentation. Learn more about using XComs to pass data between Airflow tasks in Pass data between tasks.
Retrieve the Airflow context using custom operators
In a traditional operator, the Airflow context is always passed to the .execute
method using the context
keyword argument. If you write a custom operator, you have to include a context
kwarg in the execute
method as shown in the following custom operator example.
from airflow.sdk.bases.operator import BaseOperator
class PrintDAGIDOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def execute(self, context):
print(context["dag"].dag_id)
Common Airflow context values
This section gives an overview of the most commonly used keys in the Airflow context dictionary. To see an up-to-date list of all keys and their types, view the Airflow source code.
ti / task_instance
The ti
or task_instance
key contains the TaskInstance object. The most commonly used attributes are .xcom_pull
and .xcom_push
, which allow you to push and pull XComs.
The following DAG shows an example of using context["ti"].xcom_push(...)
and context["ti"].xcom_pull(...)
to explicitly pass data between tasks.
from pendulum import datetime
from airflow.decorators import dag, task
@dag(
start_date=datetime(2023, 6, 1),
schedule=None,
catchup=False,
)
def context_and_xcom():
@task
def upstream_task(**context):
context["ti"].xcom_push(key="my_explicitly_pushed_xcom", value=23)
return 19
@task
def downstream_task(passed_num, **context):
returned_num = context["ti"].xcom_pull(
task_ids="upstream_task", key="return_value"
)
explicit_num = context["ti"].xcom_pull(
task_ids="upstream_task", key="my_explicitly_pushed_xcom"
)
print("Returned Num: ", returned_num)
print("Passed Num: ", passed_num)
print("Explicit Num: ", explicit_num)
downstream_task(upstream_task())
context_and_xcom()
The downstream_task
will print the following information to the logs:
[2023-06-16, 13:14:11 UTC] {logging_mixin.py:149} INFO - Returned Num: 19
[2023-06-16, 13:14:11 UTC] {logging_mixin.py:149} INFO - Passed Num: 19
[2023-06-16, 13:14:11 UTC] {logging_mixin.py:149} INFO - Explicit Num: 23
Scheduling keys
One of the most common reasons to access the Airflow context in your tasks is to retrieve information about the scheduling of their DAG. A common pattern is to use the timestamp of the logical date in names of files written from a DAG to create a unique file for each DAG run.
The task below creates a new text file in the include
folder for each DAG run with the timestamp in the filename in the format YYYY-MM-DDTHH:MM:SS+00:00
. Refer to Templates reference for an up to date list of time related keys in the context, and Jinja templating for more information on how to pass these values to templateable parameters of traditional operators.
# from airflow.sdk import task
@task
def write_file_with_ts(**context):
ts = context["ts"]
with open(f"include/{ts}_hello.txt", "a") as f:
f.write("Hello, World!")
dag_run
The dag_run
key contains the DAG run object. A commonly used attribute of the DAG run object is run_type
, which indicates how the DAG was triggered.
# from airflow.sdk import task
@task
def print_dagrun_info(**context):
print(context["dag_run"].run_type)
params
The params
key contains a dictionary of all DAG- and task-level params that were passed to a specific task instance. Individual params can be accessed using their respective key.
# from airflow.sdk import task
@task
def print_param(**context):
print(context["params"]["my_favorite_param"])
Learn more about params in the Airflow params guide.
var
The var
key contains all Airflow variables of your Airflow instance. Airflow variables are key-value pairs that are commonly used to store instance-level information that rarely changes.
# from airflow.sdk import task
@task
def get_var_from_context(**context):
print(context["var"]["value"].get("my_regular_var"))
print(context["var"]["json"].get("my_json_var")["num2"])