import json

from airflow.sdk import dag, Asset, AssetWatcher, task
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from include.agent_tools import reports_revenue, reports_usage, reports_marketing
from pydantic_ai import Agent


def apply_function(*args, **kwargs):
    message = args[-1]
    val = json.loads(message.value())
    print(f"Value in message is {val}")
    return val


trigger = MessageQueueTrigger(
    queue="kafka://localhost:9092/report",
    apply_function="dags.ad_hoc_report_gen.apply_function",
)

kafka_topic_asset = Asset(
    "kafka_topic_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)]
)

data_report_agent = Agent(
    "gpt-4o-mini",
    system_prompt="""
    You are a data analyst creating a report based on available information from the reporting tables accessible using your tools.
    """,
    tools=[
        reports_revenue, reports_usage, reports_marketing
    ],
)



@dag(schedule=[kafka_topic_asset])
def ad_hoc_report_gen():

    @task
    def fetch_message_from_kafka(**context):
        triggering_asset_events = context["triggering_asset_events"]
        for event in triggering_asset_events[kafka_topic_asset]:
            print(f"Processing message: {event}")
            return event.extra["payload"]["content"]

    _fetch_message_from_kafka = fetch_message_from_kafka()

    @task.agent(agent=data_report_agent)
    def ad_hoc_report(request_info) -> str:
        return request_info

    _ad_hoc_report = ad_hoc_report(request_info=_fetch_message_from_kafka)

    @task
    def send_response(agent_output):
        from include.alert_functions import post_response
	 post_response(agent_output)

    _send_response = send_response(agent_output=_ad_hoc_report)


ad_hoc_report_gen()