Run tasks in an isolated environment in Apache Airflow

It is very common to run a task with different dependencies than your Airflow environment. Your task might need a different Python version than core Airflow, or it has packages that conflict with your other tasks. In these cases, running tasks in an isolated environment can help manage dependency conflicts and enable compatibility with your execution environments.

In Airflow, you have several options for running custom Python code in isolated environments. This guide teaches you how to choose the right isolated environment option for your use case, implement different virtual environment operators and decorators, and access Airflow context and variables in isolated environments.

Other ways to learn

There are multiple resources for learning about this topic. See also:

This guide covers options to isolate individual tasks in Airflow. If you want to run all of your Airflow tasks in dedicated Kubernetes pods, consider using the Kubernetes Executor. Astronomer customers can set their Deployments to use the KubernetesExecutor in the Astro UI, see Manage Airflow executors on Astro.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

When to use isolated environments

There are two situations when you might want to run a task in an isolated environment:

  • Your task requires a different version of Python than your Airflow environment. Apache Airflow is compatible with and available in Python 3.8, 3.9, 3.10, 3.11, and 3.12. The Astro Runtime has images available for all supported Python versions, so you can run Airflow inside Docker in a reproducible environment. See Prerequisites for more information.

  • Your task requires different versions of Python packages that conflict with the package versions installed in your Airflow environment. To know which Python packages are pinned to which versions within Airflow, you can retrieve the full list of constraints for each Airflow version by going to:

    https://raw.githubusercontent.com/apache/airflow/constraints-<AIRFLOW VERSION>/constraints-<PYTHON VERSION>.txt
Airflow Best Practice

Make sure to pin all package versions, both in your core Airflow environment (requirements.txt) and in your isolated environments. This helps you avoid unexpected behavior due to package updates that might create version conflicts.

Limitations

When creating isolated environments in Airflow, you might not be able to use common Airflow features or connect to your Airflow environment in the same way you would in a regular Airflow task.

Common limitations include:

Choosing an isolated environment option

Airflow provides several options for running tasks in isolated environments.

To run tasks in a dedicated Kubernetes Pod you can use:

To run tasks in a Python virtual environment you can use:

The virtual environment decorators have operator equivalents with the same functionality. Astronomer recommends using decorators where possible because they simplify the handling of XCom.

Graph of options for isolated environments in Airflow.

Which option you choose depends on your use case and the requirements of your task. The table below shows which decorators and operators are best for particular use cases.

Use CaseImplementation Options
Run a Python task in a K8s Pod@task.kubernetes,
KubernetesPodOperator
Run a Docker image without additional Python code in a K8s PodKubernetesPodOperator
Run a Python task in an existing (reusable) virtual environment@task.external_python,
ExternalPythonOperator
Run a Python task in a new virtual environment@task.virtualenv,
PythonVirtualenvOperator
Run branching code in an existing (reusable) virtual environment@task.branch_external_python, BranchExternalPythonOperator
Run branching code in a new virtual environment@task.branch_virtualenv, BranchPythonVirtualenvOperator
Install different packages for each run of a taskPythonVirtualenvOperator,
BranchPythonVirtualenvOperator

Another consideration when choosing an operator is the infrastructure you have available. Operators that run tasks in Kubernetes pods allow you to have full control over the environment and resources used, but they require a Kubernetes cluster. Operators that run tasks in Python virtual environments are easier to set up, but do not provide the same level of control over the environment and resources used.

*Only required if you need to use a different Python version than your Airflow environment.

External Python operator

The ExternalPython operator, @task.external_python decorator or ExternalPythonOperator, runs a Python function in an existing virtual Python environment, isolated from your Airflow environment. To use the @task.external_python decorator or the ExternalPythonOperator, you need to create a separate Python environment to reference. You can use any Python binary created by any means.

The easiest way to create a Python environment when using the Astro CLI is with the Astronomer PYENV BuildKit. The BuildKit can be used by adding a comment on the first line of the Dockerfile as shown in the following example. Adding this comment enables you to create virtual environments with the PYENV keyword.

1# syntax=quay.io/astronomer/airflow-extensions:v1
2
3FROM quay.io/astronomer/astro-runtime:10.3.0-python-3.11
4
5# create a virtual environment for the ExternalPythonOperator and @task.external_python decorator
6# using Python 3.9 and install the packages from epo_requirements.txt
7PYENV 3.9 epo_pyenv epo_requirements.txt

To use the BuildKit, the Docker BuildKit Backend needs to be enabled. This is the default as of Docker Desktop version 23.0, but might need to be enabled manually in older versions of Docker.

You can add any Python packages to the virtual environment by putting them into a separate requirements file. In this example, by using the name epo_requirements.txt. Make sure to pin all package versions.

pandas==1.4.4

Installing Airflow itself and Airflow provider packages in isolated environments can lead to unexpected behavior and is not recommended. If you need to use Airflow or Airflow provider modules inside your virtual environment, Astronomer recommends to choose the @task.virtualenv decorator or the PythonVirtualenvOperator. See Use Airflow packages in isolated environments.

After restarting your Airflow environment, you can use this Python binary by referencing the environment variable ASTRO_PYENV_<my-pyenv-name>. If you choose an alternative method to create you Python binary, you need to set the python parameter of the decorator or operator to the location of your Python binary.

To run any Python function in your virtual environment, use the @task.external_python decorator on it and set the python parameter to the location of your Python binary.

1# from airflow.decorators import task
2# import os
3
4@task.external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
5def my_isolated_task():
6 print(f"The python version in the virtual env is: {sys.version}")
7 print(f"The pandas version in the virtual env is: {pd.__version__}")
8 # your code to run in the isolated environment
Traditional

To run any Python function in your virtual environment, define the python_callable parameter of the ExternalPythonOperator with your Python function, and set the python parameter to the location of your Python binary.

1# from airflow.operators.python import ExternalPythonOperator
2# import os
3
4def my_isolated_function():
5 print(f"The python version in the virtual env is: {sys.version}")
6 print(f"The pandas version in the virtual env is: {pd.__version__}")
7
8my_isolated_task = ExternalPythonOperator(
9 task_id="my_isolated_task",
10 python_callable=my_isolated_function,
11 python=os.environ["ASTRO_PYENV_epo_pyenv"]
12)
TaskFlow XCom

You can pass information into and out of the @task.external_python decorated task the same way as you would when interacting with a @task decorated task, see also Introduction to the TaskFlow API and Airflow decorators.

1"""
2## Toy example of using the @task.external_python decorator
3
4The @task.external_python decorator is used to run any Python code in an existing isolated Python environment.
5"""
6
7from airflow.decorators import dag, task
8import pandas as pd
9import sys
10import os
11
12
13@dag(
14 start_date=None,
15 schedule=None,
16 doc_md=__doc__,
17 description="@task.external_python",
18 default_args={
19 "owner": "airflow",
20 "retries": 0,
21 },
22 tags=["@task.external_python"],
23)
24def external_python_decorator_dag():
25
26 @task
27 def upstream_task():
28 print(f"The python version in the upstream task is: {sys.version}")
29 print(f"The pandas version in the upstream task is: {pd.__version__}")
30 return {"num": 1, "word": "hello"}
31
32 @task.external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
33 def my_isolated_task(upstream_task_output: dict):
34 """
35 This function runs in an isolated environment.
36 Args:
37 upstream_task_output (dict): contains a number and a word.
38 Returns:
39 pd.DataFrame: A dictionary containing the transformed inputs.
40 """
41 import pandas as pd
42 import sys
43
44 print(f"The python version in the virtual env is: {sys.version}")
45 print(f"The pandas version in the virtual env is: {pd.__version__}")
46
47 num = upstream_task_output["num"]
48 word = upstream_task_output["word"]
49
50 num_plus_one = num + 1
51 word_plus_exclamation = word + "!"
52
53 df = pd.DataFrame(
54 {
55 "num_plus_one": [num_plus_one],
56 "word_plus_exclamation": [word_plus_exclamation],
57 },
58 )
59
60 return df
61
62 @task
63 def downstream_task(arg):
64 print(f"The python version in the downstream task is: {sys.version}")
65 print(f"The pandas version in the downstream task is: {pd.__version__}")
66 return arg
67
68 downstream_task(my_isolated_task(upstream_task()))
69
70
71external_python_decorator_dag()
Traditional XCom

You can pass information into the ExternalPythonOperator by using a Jinja template retrieving XCom values from the Airflow context. To pass information out of the ExternalPythonOperator, return it from the python_callable. Note that Jinja templates are rendered as strings unless you set render_template_as_native_obj=True in the DAG definition.

1"""
2## Toy example of using the ExternalPythonOperator
3
4The ExternalPythonOperator is used to run any Python code in an existing isolated Python environment.
5"""
6
7from airflow.decorators import dag, task
8from airflow.models.baseoperator import chain
9from airflow.operators.python import ExternalPythonOperator
10import pandas as pd
11import sys
12import os
13
14
15def my_isolated_function(num: int, word: str) -> dict:
16 """
17 This function will be passed to the ExternalPythonOperator to
18 run in an isolated environment.
19 Args:
20 num (int): An integer to be incremented by 1.
21 word (str): A string to have an exclamation mark added to it.
22 Returns:
23 pd.DataFrame: A dictionary containing the transformed inputs.
24 """
25 import pandas as pd
26 import sys
27
28 print(f"The python version in the virtual env is: {sys.version}")
29 print(f"The pandas version in the virtual env is: {pd.__version__}")
30
31 num_plus_one = num + 1
32 word_plus_exclamation = word + "!"
33
34 df = pd.DataFrame(
35 {
36 "num_plus_one": [num_plus_one],
37 "word_plus_exclamation": [word_plus_exclamation],
38 },
39 )
40
41 return df
42
43
44@dag(
45 start_date=None,
46 schedule=None,
47 doc_md=__doc__,
48 description="ExternalPythonOperator",
49 render_template_as_native_obj=True,
50 default_args={
51 "owner": "airflow",
52 "retries": 0,
53 },
54 tags=["ExternalPythonOperator"],
55)
56def external_python_operator_dag():
57
58 @task
59 def upstream_task():
60 print(f"The python version in the upstream task is: {sys.version}")
61 print(f"The pandas version in the upstream task is: {pd.__version__}")
62 return {"num": 1, "word": "hello"}
63
64 my_isolated_task = ExternalPythonOperator(
65 task_id="my_isolated_task",
66 python_callable=my_isolated_function,
67 python=os.environ["ASTRO_PYENV_epo_pyenv"],
68 op_kwargs={
69 # note that render_template_as_native_obj=True in the DAG definition
70 # to render num as an integer
71 "num": "{{ ti.xcom_pull(task_ids='upstream_task')['num']}}",
72 "word": "{{ ti.xcom_pull(task_ids='upstream_task')['word']}}",
73 },
74 )
75
76 @task
77 def downstream_task(arg):
78 print(f"The python version in the downstream task is: {sys.version}")
79 print(f"The pandas version in the downstream task is: {pd.__version__}")
80 return arg
81
82 chain(upstream_task(), my_isolated_task, downstream_task(my_isolated_task.output))
83
84
85external_python_operator_dag()

To get a list of all parameters of the @task.external_python decorator / ExternalPythonOperator, see the Astronomer Registry.

Virtualenv operator

The Virtualenv operator (@task.virtualenv or PythonVirtualenvOperator) creates a new virtual environment each time the task runs. If you only specify different package versions and use the same Python version as your Airflow environment, you do not need to create or specify a Python binary.

Installing Airflow itself and Airflow provider packages in isolated environments can lead to unexpected behavior and is generally not recommended. See Use Airflow packages in isolated environments.

Add the pinned versions of the packages to the requirements parameter of the @task.virtualenv decorator. The decorator creates a new virtual environment at runtime.

1# from airflow.decorators import task
2
3@task.virtualenv(requirements=["pandas==1.5.1"]) # add your requirements to the list
4def my_isolated_task():
5 print(f"The pandas version in the virtual env is: {pd.__version__}")"
6 # your code to run in the isolated environment
Traditional

Add the pinned versions of the packages you need to the requirements parameter of the PythonVirtualenvOperator. The operator creates a new virtual environment at runtime.

1# from airflow.operators.python import PythonVirtualenvOperator
2
3def my_isolated_function():
4 print(f"The pandas version in the virtual env is: {pd.__version__}")
5 # your code to run in the isolated environment
6
7my_isolated_task = PythonVirtualenvOperator(
8 task_id="my_isolated_task",
9 python_callable=my_isolated_function,
10 requirements=[
11 "pandas==1.5.1",
12 ] # add your requirements to the list
13)
TaskFlow XCom

You can pass information into and out of the @task.virtualenv decorated task using the same process as you would when interacting with a @task decorated task. See Introduction to the TaskFlow API and Airflow decorators for more detailed information.

1"""
2## Toy example of using the @task.virtualenv decorator
3
4The @task.virtualenv decorator is used to run any Python code in a new isolated Python environment.
5"""
6
7from airflow.decorators import dag, task
8import pandas as pd
9
10
11@dag(
12 start_date=None,
13 schedule=None,
14 doc_md=__doc__,
15 description="@task.virtualenv",
16 default_args={
17 "owner": "airflow",
18 "retries": 0,
19 },
20 tags=["@task.virtualenv"],
21)
22def virtualenv_decorator_dag():
23
24 @task
25 def upstream_task():
26 print(f"The pandas version in the upstream task is: {pd.__version__}")
27 return {"num": 1, "word": "hello"}
28
29 @task.virtualenv(requirements=["pandas==1.5.1"])
30 def my_isolated_task(upstream_task_output: dict):
31 """
32 This function runs in an isolated environment.
33 Args:
34 upstream_task_output (dict): contains a number and a word.
35 Returns:
36 pd.DataFrame: A dictionary containing the transformed inputs.
37 """
38 import pandas as pd
39
40 print(f"The pandas version in the virtual env is: {pd.__version__}")
41
42 num = upstream_task_output["num"]
43 word = upstream_task_output["word"]
44
45 num_plus_one = num + 1
46 word_plus_exclamation = word + "!"
47
48 df = pd.DataFrame(
49 {
50 "num_plus_one": [num_plus_one],
51 "word_plus_exclamation": [word_plus_exclamation],
52 },
53 )
54
55 return df
56
57 @task
58 def downstream_task(arg):
59 print(f"The pandas version in the downstream task is: {pd.__version__}")
60 return arg
61
62 downstream_task(my_isolated_task(upstream_task_output=upstream_task()))
63
64
65virtualenv_decorator_dag()
Traditional XCom

You can pass information into the PythonVirtualenvOperator by using a Jinja template to retrieve XCom values from the Airflow context. To pass information out of the PythonVirtualenvOperator, return it from the python_callable. Note that Jinja templates are rendered as strings unless you set render_template_as_native_obj=True in the DAG definition.

1"""
2## Toy example of using the PythonVirtualenvOperator
3
4The PythonVirtualenvOperator is used to run any Python code in a new isolated Python environment.
5"""
6
7from airflow.decorators import dag, task
8from airflow.models.baseoperator import chain
9from airflow.operators.python import PythonVirtualenvOperator
10import pandas as pd
11import sys
12
13
14def my_isolated_function(num: int, word: str) -> dict:
15 """
16 This function will be passed to the PythonVirtualenvOperator to
17 run in an isolated environment.
18 Args:
19 num (int): An integer to be incremented by 1.
20 word (str): A string to have an exclamation mark added to it.
21 Returns:
22 pd.DataFrame: A dictionary containing the transformed inputs.
23 """
24 import pandas as pd
25
26 print(f"The pandas version in the virtual env is: {pd.__version__}")
27
28 num_plus_one = num + 1
29 word_plus_exclamation = word + "!"
30
31 df = pd.DataFrame(
32 {
33 "num_plus_one": [num_plus_one],
34 "word_plus_exclamation": [word_plus_exclamation],
35 },
36 )
37
38 return df
39
40
41@dag(
42 start_date=None,
43 schedule=None,
44 doc_md=__doc__,
45 description="PythonVirtualenvOperator",
46 render_template_as_native_obj=True,
47 default_args={
48 "owner": "airflow",
49 "retries": 0,
50 },
51 tags=["PythonVirtualenvOperator"],
52)
53def python_virtualenv_operator_dag():
54
55 @task
56 def upstream_task():
57 print(f"The python version in the upstream task is: {sys.version}")
58 print(f"The pandas version in the upstream task is: {pd.__version__}")
59 return {"num": 1, "word": "hello"}
60
61 my_isolated_task = PythonVirtualenvOperator(
62 task_id="my_isolated_task",
63 python_callable=my_isolated_function,
64 requirements=["pandas==1.5.1"],
65 op_kwargs={
66 # note that render_template_as_native_obj=True in the DAG definition
67 # to render num as an integer
68 "num": "{{ ti.xcom_pull(task_ids='upstream_task')['num']}}",
69 "word": "{{ ti.xcom_pull(task_ids='upstream_task')['word']}}",
70 },
71 )
72
73 @task
74 def downstream_task(arg):
75 print(f"The python version in the downstream task is: {sys.version}")
76 print(f"The pandas version in the downstream task is: {pd.__version__}")
77 return arg
78
79 chain(upstream_task(), my_isolated_task, downstream_task(my_isolated_task.output))
80
81
82python_virtualenv_operator_dag()

Since the requirements parameter of the PythonVirtualenvOperator is templatable, you can use Jinja templating to pass information at runtime. For example, you can use a Jinja template to install a different version of pandas for each run of the task.

1# from airflow.decorators import task
2# from airflow.models.baseoperator import chain
3# from airflow.operators.python import PythonVirtualenvOperator
4
5@task
6def get_pandas_version():
7 pandas_version = "1.5.1" # retrieve the pandas version according to your logic
8 return pandas_version
9
10my_isolated_task = PythonVirtualenvOperator(
11 task_id="my_isolated_task",
12 python_callable=my_isolated_function,
13 requirements=[
14 "pandas=={{ ti.xcom_pull(task_ids='get_pandas_version') }}",
15 ],
16)
17
18chain(get_pandas_version(), my_isolated_task)

If your task requires a different Python version than your Airflow environment, you need to install the Python version your task requires in your Airflow environment so the Virtualenv task can use it. Use the Astronomer PYENV BuildKit to install a different Python version in your Dockerfile.

1# syntax=quay.io/astronomer/airflow-extensions:v1
2
3FROM quay.io/astronomer/astro-runtime:10.3.0-python-3.11
4
5PYENV 3.10 pyenv_3_10

To use the BuildKit, the Docker BuildKit Backend needs to be enabled. This is the default starting in Docker Desktop version 23.0, but might need to be enabled manually in older versions of Docker.

The Python version can be referenced directly using the python parameter of the decorator/operator.

1# from airflow.decorators import task
2
3@task.virtualenv(
4 requirements=["pandas==1.5.1"],
5 python_version="3.10", # specify the Python version
6)
7def my_isolated_task():
8 print(f"The python version in the virtual env is: {sys.version}")
9 print(f"The pandas version in the virtual env is: {pd.__version__}")
10 # your code to run in the isolated environment
Traditional
1#from airflow.operators.python import PythonVirtualenvOperator
2
3def my_isolated_function():
4 print(f"The python version in the virtual env is: {sys.version}")
5 print(f"The pandas version in the virtual env is: {pd.__version__}")
6 # your code to run in the isolated environment
7
8my_isolated_task = PythonVirtualenvOperator(
9 task_id="my_isolated_task",
10 python_callable=my_isolated_function,
11 requirements=["pandas==1.5.1"],
12 python_version="3.10", # specify the Python version
13)

To get a list of all parameters of the @task.virtualenv decorator or PythonVirtualenvOperator, see the Astronomer Registry.

Kubernetes pod operator

The Kubernetes operator, @task.kubernetes decorator or KubernetesPodOperator, runs an Airflow task in a dedicated Kubernetes pod. You can use the @task.kubernetes to run any custom Python code in a separate Kubernetes pod on a Docker image with Python installed, while the KubernetesPodOperator runs any existing Docker image.

To use the @task.kubernetes decorator or the KubernetesPodOperator, you need to provide a Docker image and have access to a Kubernetes cluster. The following example shows how to use the modules to run a task in a separate Kubernetes pod in the same namespace and Kubernetes cluster as your Airflow environment. For more information on how to use the KubernetesPodOperator, see Use the KubernetesPodOperator and Run the KubernetesPodOperator on Astro.

1# from airflow.decorators import task
2# from airflow.configuration import conf
3
4# if you are running Airflow on Kubernetes, you can get
5# the current namespace from the Airflow conf
6namespace = conf.get("kubernetes", "NAMESPACE")
7
8@task.kubernetes(
9 image="<YOUR IMAGE>",
10 in_cluster=True,
11 namespace=namespace,
12 name="<YOUR POD NAME>",
13 get_logs=True,
14 log_events_on_failure=True,
15 do_xcom_push=True,
16)
17def my_isolated_task(num: int):
18 return num + 1
Traditional
1# from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
2# from airflow.configuration import conf
3
4# if you are running Airflow on Kubernetes, you can get
5# the current namespace from the Airflow conf
6namespace = conf.get("kubernetes", "NAMESPACE")
7
8my_isolated_task = KubernetesPodOperator(
9 task_id="my_isolated_task",
10 namespace=namespace,
11 # your Docker image contains the scripts to run in the isolated environment
12 image="<YOUR IMAGE>",
13 name="<YOUR POD NAME>",
14 in_cluster=True,
15 is_delete_operator_pod=True,
16 get_logs=True,
17)

Virtual branching operators

Virtual branching operators allow you to run conditional task logic in an isolated Python environment.

  • @task.branch_external_python decorator or BranchExternalPythonOperator: Run conditional task logic in an existing virtual Python environment.
  • @task.branch_virtualenv decorator or BranchPythonVirtualenvOperator: Run conditional task logic in a newly created virtual Python environment.

To run conditional task logic in an isolated environment, use the branching versions of the virtual environment decorators and operators. You can learn more about branching in Airflow in the Branching in Airflow guide.

@task.external_python
1# from airflow.decorators import task
2# import os
3
4@task.branch_external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
5def my_isolated_task():
6 print(f"The pandas version in the virtual env is: {pd.__version__}")
7
8 num = random.randint(0, 100)
9
10 if num > 50:
11 # return the task_id of the downstream task that should be executed
12 return "downstream_task_a"
13 else:
14 return "downstream_task_b"
ExternalPythonOperator
1# from airflow.operators.python import BranchExternalPythonOperator
2# import os
3
4def my_isolated_function():
5 print(f"The pandas version in the virtual env is: {pd.__version__}")
6
7 num = random.randint(0, 100)
8
9 if num > 50:
10 # return the task_id of the downstream task that should be executed
11 return "downstream_task_a"
12 else:
13 return "downstream_task_b"
14
15my_isolated_task = BranchExternalPythonOperator(
16 task_id="my_isolated_task",
17 python_callable=my_isolated_function,
18 python=os.environ["ASTRO_PYENV_epo_pyenv"]
19)
@task.virtualenv
1# from airflow.decorators import task
2
3@task.branch_virtualenv(requirements=["pandas==1.5.3"])
4def my_isolated_task():
5 print(f"The pandas version in the virtual env is: {pd.__version__}")
6
7 num = random.randint(0, 100)
8
9 if num > 50:
10 # return the task_id of the downstream task that should be executed
11 return "downstream_task_a"
12 else:
13 return "downstream_task_b"
PythonVirtualenvOperator
1# from airflow.operators.python import BranchPythonVirtualenvOperator
2
3def my_isolated_function():
4 print(f"The pandas version in the virtual env is: {pd.__version__}")
5
6 num = random.randint(0, 100)
7
8 if num > 50:
9 # return the task_id of the downstream task that should be executed
10 return "downstream_task_a"
11 else:
12 return "downstream_task_b"
13
14my_isolated_task = BranchPythonVirtualenvOperator(
15 task_id="my_isolated_task",
16 python_callable=my_isolated_function,
17 requirements=["pandas==1.5.1"],
18)

Use Airflow context variables in isolated environments

Some variables from the Airflow context can be passed to isolated environments, for example the logical_date of the DAG run. Due to compatibility issues, other objects from the context such as ti cannot be passed to isolated environments. For more information, see the Airflow documentation.

@task.external_python
1# from airflow.decorators import task
2# import os
3
4# note that to be able to use the logical date, pendulum needs to be installed in the epo_pyenv
5@task.external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
6def my_isolated_task(logical_date):
7 print(f"The logical date is: {logical_date}")
8 # your code to run in the isolated environment
9
10my_isolated_task()
ExternalPythonOperator
1# from airflow.operators.python import ExternalPythonOperator
2# import os
3
4def my_isolated_function(logical_date_from_op_kwargs):
5 print(f"The logical date is: {logical_date_from_op_kwargs}")
6 # your code to run in the isolated environment
7
8my_isolated_task = ExternalPythonOperator(
9 task_id="my_isolated_task",
10 python_callable=my_isolated_function,
11 # note that to be able to use the logical date, pendulum needs to be installed in the epo_pyenv
12 python=os.environ["ASTRO_PYENV_epo_pyenv"],
13 op_kwargs={
14 "logical_date_from_op_kwargs": "{{ logical_date }}",
15 },
16)
@task.virtualenv
1# from airflow.decorators import task
2
3@task.virtualenv(
4 requirements=[
5 "pandas==1.5.1",
6 "pendulum==3.0.0",
7 ], # pendulum is needed to use the logical date
8)
9def my_isolated_task(logical_date):
10 print(f"The logical date is: {logical_date}")
11 # your code to run in the isolated environment
PythonVirtualenvOperator
1# from airflow.operators.python import PythonVirtualenvOperator
2
3def my_isolated_function(logical_date_from_op_kwargs):
4 print(f"The logical date is: {logical_date_from_op_kwargs}")
5 # your code to run in the isolated environment
6
7my_isolated_task = PythonVirtualenvOperator(
8 task_id="my_isolated_task",
9 python_callable=my_isolated_function,
10 requirements=[
11 "pandas==1.5.1",
12 "pendulum==3.0.0",
13 ], # pendulum is needed to use the logical date
14 op_kwargs={
15 "logical_date_from_op_kwargs": "{{ logical_date }}"
16 },
17)

Use Airflow variables in isolated environments

You can inject Airflow variables into isolated environments by using Jinja templating in the op_kwargs argument of the PythonVirtualenvOperator or ExternalPythonOperator. This strategy lets you pass secrets into your isolated environment, which are masked in the logs according to rules described in Hide sensitive information in Airflow variables.

PythonVirtualenvOperator
1# from airflow.operators.python import PythonVirtualenvOperator
2
3def my_isolated_function(password_from_op_kwargs):
4 print(f"The password is: {password_from_op_kwargs}")
5
6my_isolated_task = PythonVirtualenvOperator(
7 task_id="my_isolated_task",
8 python_callable=my_isolated_function,
9 requirements=["pandas==1.5.1"],
10 python_version="3.10",
11 op_kwargs={
12 "password_from_op_kwargs": "{{ var.value.my_secret }}",
13 },
14)
ExternalPythonOperator
1# from airflow.operators.python import ExternalPythonOperator
2# import os
3
4def my_isolated_function(password_from_op_kwargs):
5 print(f"The password is: {password_from_op_kwargs}")
6
7my_isolated_task = ExternalPythonOperator(
8 task_id="my_isolated_task",
9 python_callable=my_isolated_function,
10 python=os.environ["ASTRO_PYENV_epo_pyenv"],
11 op_kwargs={
12 "password_from_op_kwargs": "{{ var.value.my_secret }}",
13 },
14)

Use Airflow packages in isolated environments

Using Airflow packages inside of isolated environments can lead to unexpected behavior and is not recommended.

If you need to use Airflow or an Airflow provider module inside your virtual environment, use the @task.virtualenv decorator or the PythonVirtualenvOperator instead of the @task.external_python decorator or the ExternalPythonOperator. As of Airflow 2.8, you can cache the virtual environment for reuse by providing a venv_cache_path to the @task.virtualenv decorator or PythonVirtualenvOperator, to speed up subsequent runs of your task.

1# from airflow.decorators import task
2
3@task.virtualenv(
4 requirements=[
5 "apache-airflow-providers-snowflake==5.3.0",
6 "apache-airflow==2.8.1",
7 "pandas==1.5.3",
8 ],
9 venv_cache_path="/tmp/venv_cache", # optional caching of the virtual environment
10)
11def my_isolated_task():
12 from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
13 hook = SnowflakeHook(snowflake_conn_id="MY_SNOWFLAKE_CONN_ID")
14 result = hook.get_first("SELECT * FROM MY_TABLE LIMIT 1")
15 print(f"The pandas version in the virtual env is: {pd.__version__}")
16
17 return result
18
19my_isolated_task()
Traditional
1# from airflow.operators.python import PythonVirtualenvOperator
2
3def my_isolated_function():
4 from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
5 hook = SnowflakeHook(snowflake_conn_id="MY_SNOWFLAKE_CONN_ID")
6 result = hook.get_first("SELECT * FROM MY_TABLE LIMIT 1")
7 print(f"The pandas version in the virtual env is: {pd.__version__}")
8
9 return result
10
11my_isolated_task = PythonVirtualenvOperator(
12 task_id="my_isolated_task",
13 python_callable=my_isolated_function,
14 requirements=[
15 "pandas==1.5.3",
16 "apache-airflow==2.8.1",
17 "apache-airflow-providers-snowflake==5.3.0",
18 ],
19 venv_cache_path="/tmp/venv_cache", # optional caching of the virtual environment
20)