Orchestrating Feature Pipelines: Announcing the Tecton Airflow Provider

  • Nick Acosta

Editor's note: This post originally appeared on the Tecton blog.

Apache Airflow® is an open-source workflow management platform that data engineering teams frequently use to orchestrate ETL pipelines. Airflow serves as the central control point for data operations. For example, if you’re trying to move sales data from Salesforce to Snowflake and transform it so that it can be viewed in Tableau, and you are using many different cloud services to do that, Airflow will manage the execution of how and when each of these individual tasks can be composed together.

Tecton automatically orchestrates the compute and storage services necessary to transform data to features for production machine learning. Tecton customers already using Airflow may want to bring Tecton jobs into Airflow, so that there is a central location within the organization for every data pipeline. Depop is one such customer (you can read more about their implementation of Tecton and Airflow here). To facilitate that implementation, Tecton is excited to announce the Tecton Airflow provider!

Triggered Materialization and Monitoring

The Tecton Airflow provider is the best way to manage Tecton resources in Airflow. There are 5 ways to interact with Tecton within Airflow DAGs:

1. TectonSensor

A TectonSensor waits for Feature View/Feature Service data to materialize. This Sensor blocks the tasks following it in an Airflow DAG until materialization jobs for the particular Feature View/Feature Service have finished. An Airflow DAG may kick off the training or re-training of a machine learning model after a TectonSensor, or any number of other downstream tasks, the offline and online stores will now be up-to-date. Some developers using Tecton and Airflow will use 2 TectonSensors for each Feature Service, so that can be alerted for when the offline store and online are ready separately. TectonSensor is often combined with TectonMaterializationOperator or TectonFeatureTableIngestOperator.

wait_for_feature_service_online = TectonSensor(
    task_id="wait_for_fs_online",
    workspace=WORKSPACE,
    feature_service=FEATURE_SERVICE,
    online=True,
    offline=False,
)
wait_for_feature_service_offline = TectonSensor(
    task_id="wait_for_fs_offline",
    workspace=WORKSPACE,
    feature_service=FEATURE_SERVICE,
    online=False,
    offline=True,
)

2. TectonMaterializationOperator

Tecton will begin materializing a Feature View if the TectonMaterializationOperator is called. By default, Tecton Feature View schedules are maintained in Tecton. Moving the materialization to Airflow, however, will allow you to maintain the schedule for every data tool in a single service and compose the operation and schedule of many data tools together in a single pipeline. Triggering materializations out of Airflow also allows you to define retry logic, error handling, and logging Tecton and other components of your pipeline all within Airflow. With this Operator, however, Tecton will still manage retries.

tecton_materialization = TectonMaterializationOperator(
    task_id="trigger_tecton",
    workspace=WORKSPACE,
          feature_view=FEATURE_VIEW,
    online=True,
    offline=True,
    start_time=datetime.combine(datetime.now() - timedelta(days = 1), datetime.min.time()),
    end_time=datetime.combine(datetime.now(), datetime.min.time()),
)

3. TectonJobOperator

Often the previous two Tecton Airflow operators are combined, running TectonMaterializationOperator then TectonSensor immediately after. TectonJobOperator combines the two, it launches a Tecton job and waits for its completion. This can be used with an additional TectonSensor to monitor the status of an offline store and online store separately.

tecton_job = TectonJobOperator(
    task_id="trigger_tecton",
    workspace=WORKSPACE,
    feature_view=FEATURE_VIEW,
    online=False,
    offline=True,
    start_time=datetime.combine(datetime.now() - timedelta(days = 1), datetime.min.time()),
    end_time=datetime.combine(datetime.now(), datetime.min.time()),
    )
    online_data_ready = TectonSensor(
    task_id="wait_for_online",
    workspace=WORKSPACE,
    feature_view=FEATURE_VIEW,
    online=True,
    offline=False,
    )

4. TectonFeatureTableIngestOperator

A Feature Table allows you to ingest features into Tecton that you've already transformed outside of Tecton.  TectonFeatureTableIngestOperator launches an ingestion job that can take an incoming Pandas Dataframe containing transformed feature data into Tecton. This Operator can be used if you have unpredictably arriving data but want Tecton  to manage retries of jobs. If you already run Airflow pipelines that produce batch features and want to bring them to Tecton for online and/or offline serving, this Operator can be placed upstream for those pipelines and/or tasks.

tecton_trigger = TectonFeatureTableIngestOperator(
    task_id="trigger_tecton",
    workspace=WORKSPACE,
    feature_view=FEATURE_VIEW,
    online=True,
    offline=True,
    df_generator=generate_df,
    op_args=(1, 2, 3, 4),
)
data_ready = TectonSensor(
    task_id="wait_for_data",
    workspace=WORKSPACE,
    feature_view=FEATURE_VIEW,
    online=True,
    offline=True,
)

5. TectonFeatureTableJobOperator

`

T

ectonFeatureTableJobOperatorcombines the functionality ofTectonFeatureTableIngestOperatorandTectonSensor. This can be valuable in a testing situation where you are not yet ready for features to be materialized to an online store. Once a DAG with TectonFeatureTableJobOperatoris operating properly for features offline, an additionalTectonSensor` can be added to the DAG to monitor the online store.

Setting up Tecton on Airflow with Astronomer

Here is a quick guide to get started with Tecton and Airflow that uses the Astro CLI from Astronomer. Homebrew, Docker Desktop and a Tecton account are the only prerequisites needed. After a few commands, you’ll have a local version of Airflow up and running with the Tecton Airflow provider installed.

brew install astro
mkdir astro
cd astro
astro dev init
echo “airflow-provider-tecton” >> requirements.txt
curl https://raw.githubusercontent.com/tecton-ai/airflow-provider-tecton/main/tecton_provider/example_dags/example_tecton.py >> dags/example_tecton.py
astro dev start

After these commands, your Airflow will be up and running with the Tecton Airflow provider and an example DAG at http://localhost:8080/, log in with the username admin and password admin.

The example that was added to your dags folder needs to connect with Tecton to run. In the Airflow UI at the top menu, find “Admin” then select “Connections” and the “+” button to “Add a new record.” Connections allow Airflow to store credentials that are used to talk to external systems. A Tecton Connection will have a tecton_default Connection Id, Tecton Connection Type, your Tecton account’s url as a Host, and an API Key from a Tecton service account (assign the service account an Operator role for the Tecton workspace created in the next step).

Tecton has prepared an example Feature View to utilize for this demonstration.

mkdir tecton-airflow

cd tecton-airflow

tecton workspace create airflow —-live

tecton init

curl [<u>https://raw.githubusercontent.com/PubChimps/airflow-provider-tecton/main/tecton_provider/example_dags/scheduled_fs.py</u>](https://raw.githubusercontent.com/PubChimps/airflow-provider-tecton/main/tecton_provider/example_dags/scheduled_fs.py) >> features.py

tecton apply

If you would like to use your own Feature View, you may need to make some changes to existing Feature Views to manually trigger materializations. Running materializations on both Airflow and Tecton schedules can cause overlaps that will produce TectonErrors, to prevent this, set the Feature Views’ batch_trigger  to BatchTriggerType.MANUAL, especially when testing a new Feature View in Airflow, so the materializations you trigger from Airflow don’t overlap with materialization Tecton automatically runs. This is the only new consideration you will need in your Tecton Feature View definition,  but when using your own features your Airflow DAG definition (example_tecton.py) will need some edits too.

In example_tecton.py, Airflow triggers a materialization for the WORKSPACE and FEATURE_VIEW defined; edit this if you are testing your own Feature Views. Finally, the TectonJobOperator takes in a start_date and end_date for materialization. If an example is used other than the one provided, you may need to ensure that data sources you are using have data between the start_date and end_date, and that that data has not already been materialized by Tecton.

Now Airflow is all set up to orchestrate your Tecton feature pipelines! With Airflow, you can trigger materializations before and/or after other components of your data pipelines, ingest features Airflow is already preparing into Tecton feature tables, and monitor materialization to your offline and online store independently. Sign up for a free Tecton trial today! And stay tuned here for more information on Tecton and Airflow, including how to incorporate the two technologies together with data transformation with dbt!

Ready to Get Started?

See how your team can fuel its data workflows with more power and less complexity than ever before.

Start Free Trial →

Which plan works best for your team?

Learn about pricing →

What can Astronomer do for your organization?

Talk to an expert →