from airflow.sdk import dag, chain
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta
# based on your existing tables and DAGs
_IN_REVENUE_TABLE = "PROD.CDP.IN_REVENUE"
_IN_REVENUE_STAGE = "revenue_stage"
_SNOWFLAKE_CONN_ID = "snowflake_prod"
_AWS_CONN_ID = "aws_revenue_pipeline"
@dag(
dag_id='s3_to_snowflake_elt',
start_date=datetime(2025, 8, 23),
schedule='@daily',
default_args={
"retries": 3,
"retry_delay": timedelta(seconds=20)
},
template_searchpath=["include/sql"],
tags=['elt', 's3', 'snowflake']
)
def s3_to_snowflake_elt():
_s3_to_snowflake = CopyFromExternalStageToSnowflakeOperator(
task_id='s3_to_snowflak',
snowflake_conn_id=_SNOWFLAKE_CONN_ID,
aws_conn_id=_AWS_CONN_ID,
table=_IN_REVENUE_TABLE,
stage=_IN_REVENUE_STAGE,
files=["{{ ds_nodash }}_revenue_log.csv"],
file_format="(type= 'CSV', field_delimiter= ',')"
)
_transform = SQLExecuteQueryOperator(
task_id='transform_data',
snowflake_conn_id='snowflake_default'
sql="create_metrics_revenue.sql",
)
chain(_s3_to_snowflake, _transform)
s3_to_snowflake_elt()