# Airflow 2 legacy DAG
from airflow.decorators import dag, task
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.datasets import Dataset

_URL = "internal.api/run"


@dag(
    start_date=days_ago(1),
    schedule_interval="@daily",
    default_view="grid",
    catchup=False,
)
def run_command():

    @task
    def get_command(execution_date):
        import requests

        response = requests.get(_URL)
        response_json = response.json()
        response_json["timestamp"] = execution_date.isoformat()
        return response_json

    _get_command = get_command()

    _run_command = BashOperator(
        task_id="run_command",
        bash_command=f"{_get_command}",
        outlets=[Dataset("command_run")],
    )

    _get_command >> _run_command


run_command()

# Updated to new Airflow 3 imports
# from airflow.sdk import dag, task, Asset, chain
from airflow.providers.standard.operators.bash import BashOperator
from pendulum import datetime

_URL = "internal.api/run"

@dag(
    start_date=datetime(2025, 8, 23), # days_ago was deprecated
    schedule="@daily" # schedule_interval was removed
    # catchup is set to False by default in Airflow 3
)
def run_command():

    @task
    def get_command(logical_date):  # logical_date replaced the deprecated execution_date
        import requests
        response = requests.get(_URL)
        response["timestamp"] = logical_date.isoformat()
        return response.json()

    _get_command = get_command()

    _run_command = BashOperator(
        task_id="run_command",
        bash_command=f"{_get_command}",
        outlets=[Asset("command_run")]  # Airflow 3 uses Asset instead of Dataset
    )

    chain(_get_command, _run_command)

run_command()