# Airflow 2 legacy DAG
from airflow.decorators import dag, task
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.datasets import Dataset
_URL = "internal.api/run"
@dag(
start_date=days_ago(1),
schedule_interval="@daily",
default_view="grid",
catchup=False,
)
def run_command():
@task
def get_command(execution_date):
import requests
response = requests.get(_URL)
response_json = response.json()
response_json["timestamp"] = execution_date.isoformat()
return response_json
_get_command = get_command()
_run_command = BashOperator(
task_id="run_command",
bash_command=f"{_get_command}",
outlets=[Dataset("command_run")],
)
_get_command >> _run_command
run_command()
# Updated to new Airflow 3 imports
# from airflow.sdk import dag, task, Asset, chain
from airflow.providers.standard.operators.bash import BashOperator
from pendulum import datetime
_URL = "internal.api/run"
@dag(
start_date=datetime(2025, 8, 23), # days_ago was deprecated
schedule="@daily" # schedule_interval was removed
# catchup is set to False by default in Airflow 3
)
def run_command():
@task
def get_command(logical_date): # logical_date replaced the deprecated execution_date
import requests
response = requests.get(_URL)
response["timestamp"] = logical_date.isoformat()
return response.json()
_get_command = get_command()
_run_command = BashOperator(
task_id="run_command",
bash_command=f"{_get_command}",
outlets=[Asset("command_run")] # Airflow 3 uses Asset instead of Dataset
)
chain(_get_command, _run_command)
run_command()