import json
from airflow.sdk import dag, Asset, AssetWatcher, task
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from include.agent_tools import reports_revenue, reports_usage, reports_marketing
from pydantic_ai import Agent
def apply_function(*args, **kwargs):
message = args[-1]
val = json.loads(message.value())
print(f"Value in message is {val}")
return val
trigger = MessageQueueTrigger(
queue="kafka://localhost:9092/report",
apply_function="dags.ad_hoc_report_gen.apply_function",
)
kafka_topic_asset = Asset(
"kafka_topic_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)]
)
data_report_agent = Agent(
"gpt-4o-mini",
system_prompt="""
You are a data analyst creating a report based on available information from the reporting tables accessible using your tools.
""",
tools=[
reports_revenue, reports_usage, reports_marketing
],
)
@dag(schedule=[kafka_topic_asset])
def ad_hoc_report_gen():
@task
def fetch_message_from_kafka(**context):
triggering_asset_events = context["triggering_asset_events"]
for event in triggering_asset_events[kafka_topic_asset]:
print(f"Processing message: {event}")
return event.extra["payload"]["content"]
_fetch_message_from_kafka = fetch_message_from_kafka()
@task.agent(agent=data_report_agent)
def ad_hoc_report(request_info) -> str:
return request_info
_ad_hoc_report = ad_hoc_report(request_info=_fetch_message_from_kafka)
@task
def send_response(agent_output):
from include.alert_functions import post_response
post_response(agent_output)
_send_response = send_response(agent_output=_ad_hoc_report)
ad_hoc_report_gen()