from airflow.sdk import dag, chain
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta

# based on your existing tables and DAGs
_IN_REVENUE_TABLE = "PROD.CDP.IN_REVENUE"
_IN_REVENUE_STAGE = "revenue_stage"
_SNOWFLAKE_CONN_ID = "snowflake_prod"
_AWS_CONN_ID = "aws_revenue_pipeline"

@dag(
    dag_id='s3_to_snowflake_elt',
    start_date=datetime(2025, 8, 23),
    schedule='@daily',
    default_args={
        "retries": 3,
        "retry_delay": timedelta(seconds=20)
    },
    template_searchpath=["include/sql"],
    tags=['elt', 's3', 'snowflake']
)
def s3_to_snowflake_elt():

    _s3_to_snowflake = CopyFromExternalStageToSnowflakeOperator(
        task_id='s3_to_snowflak',
        snowflake_conn_id=_SNOWFLAKE_CONN_ID,
        aws_conn_id=_AWS_CONN_ID,
        table=_IN_REVENUE_TABLE,
        stage=_IN_REVENUE_STAGE,
        files=["{{ ds_nodash }}_revenue_log.csv"],
        file_format="(type= 'CSV', field_delimiter= ',')"
    )

    _transform = SQLExecuteQueryOperator(
        task_id='transform_data',
        snowflake_conn_id='snowflake_default'
        sql="create_metrics_revenue.sql",
    )

    chain(_s3_to_snowflake, _transform)

s3_to_snowflake_elt()