Configure task log collection and exporting to ElasticSearch

Airflow task logs are stored in a logging backend to ensure you can access them after your Pods terminate. By default, Astronomer uses Vector to collect task logs and export them to an ElasticSearch instance.

You can configure how Astronomer collects Deployment task logs and exports them to ElasticSearch. The following are the supported methods for exporting task logs to ElasticSearch:

  • Using a Daemonset pod on each Kubernetes node in your cluster.
  • Using container sidecars for Deployment components.

Export task logs using a Vector DaemonSet

Exporting task logs using a Vector Daemonset is not supported for Airflow 3.

By default, Astro Private Cloud uses a Vector DaemonSet to aggregate task logs. This is the workflow for the default implementation:

  • Deployments write task logs to stdout.
  • Kubernetes takes the output from stdout and writes it to the Deployment’s node.
  • A Vector Pod reads logs from the node and forwards them to ElasticSearch.

Astronomer recommends using Vector Daemonset for Organizations that:

  • Run longer tasks using Celery executor.
  • Run Astro Private Cloud in a dedicated cluster.
  • Run privileged containers in a cluster with a ClusterRole.

This approach is not suited for Organizations that don’t allow logging container to run in priviledged mode and run many small tasks using the Kubernetes executor. Because task logs exist only for the lifetime of the Pod, your Pods running small tasks might complete before Vector can collect their task logs.

Export logs using container sidecars

You can use a logging sidecar container to collect and export logs. In this implementation:

  • Each container running an Airflow component for a Deployment receives its own Vector sidecar.
  • Task logs are written to a shared directory.
  • The Vector sidecar reads logs from the shared directory and writes them to ElasticSearch.

This implementation is recommended for organizations that:

  • Run Astro Private Cloud in a multi-tenant cluster, where security is a concern.
  • Use the KubernetesExecutor to run many short-lived tasks, which requires improved reliability.

Configure logging sidecars

  1. Retrieve your values.yaml file. See Apply a config change.

  2. Add the following entry to your values.yaml file:

    1global:
    2 vectorEnabled: false
    3 loggingSidecar:
    4 enabled: true
    5 name: sidecar-log-consumer

    If you’re migrating from Fluentd, you must also set the following configuration so that Astro Private Cloud can retain logs:

    1global:
    2 logging:
    3 indexNamePrefix: <your-index-prefix>
  3. Push the configuration change. See Apply a config change.

Customize Vector logging sidecars

You can customize the default Astronomer Vector logging sidecar to have different transformations and sinks based on your team’s requirements. This is useful if you want to annotate, customize, or filter your logs before sending them to your logging platform.

  1. In the Astro UI, go to your Clusters page and select your cluster.

  2. In the cluster details, click Edit in the Deployment Configuration section and add the override below in the Configuration Override field:

    1global:
    2 loggingSidecar:
    3 enabled: true
    4 name: sidecar-log-consumer
    5 customConfig: true

    For more info on using the Configuration Override in the UI, see Override base configuration.

  3. Save and apply your changes in the UI.

Pushing this change updates the configuration for the Cluster. Individual deployments will receive the new sidecar logging configuration once they are redeployed.

  1. Create a custom vector configuration yaml file to change how and where sidecars forward your logs. The following examples are template configurations for each commonly used external logging service. For the complete default logging sidecar configmap, see the Astronomer GitHub.
1log_schema:
2 timestamp_key : "@timestamp"
3data_dir: "${SIDECAR_LOGS}"
4sources:
5 airflow_log_files:
6 type: file
7 include:
8 - "${SIDECAR_LOGS}/*.log"
9 read_from: beginning
10transforms:
11 transform_airflow_logs:
12 type: remap
13 inputs:
14 - airflow_log_files
15 source: |
16 .component = "${COMPONENT:--}"
17 .workspace = "${WORKSPACE:--}"
18 .release = "${RELEASE:--}"
19 .date_nano = parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S.%f%Z")
20
21 filter_common_logs:
22 type: filter
23 inputs:
24 - transform_airflow_logs
25 condition:
26 type: "vrl"
27 source: '!includes(["worker","scheduler"], .component)'
28
29 filter_scheduler_logs:
30 type: filter
31 inputs:
32 - transform_airflow_logs
33 condition:
34 type: "vrl"
35 source: 'includes(["scheduler"], .component)'
36
37 filter_worker_logs:
38 type: filter
39 inputs:
40 - transform_airflow_logs
41 condition:
42 type: "vrl"
43 source: 'includes(["worker"], .component)'
44
45 filter_gitsyncrelay_logs:
46 type: filter
47 inputs:
48 - transform_airflow_logs
49 condition:
50 type: "vrl"
51 source: 'includes(["git-sync-relay"], .component)'
52
53 transform_task_log:
54 type: remap
55 inputs:
56 - filter_worker_logs
57 - filter_scheduler_logs
58 source: |-
59 . = parse_json(.message) ?? .
60 .@timestamp = parse_timestamp(.timestamp, "%Y-%m-%dT%H:%M:%S%Z") ?? now()
61 .check_log_id = exists(.log_id)
62 if .check_log_id != true {
63 .log_id = join!([to_string!(.dag_id), to_string!(.task_id), to_string!(.execution_date), to_string!(.try_number)], "_")
64 }
65 .offset = to_int(now()) * 1000000000 + to_unix_timestamp(now()) * 1000000
66
67 final_task_log:
68 type: remap
69 inputs:
70 - transform_task_log
71 source: |
72 .component = "${COMPONENT:--}"
73 .workspace = "${WORKSPACE:--}"
74 .release = "${RELEASE:--}"
75 .date_nano = parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S.%f%Z")
76
77 transform_remove_fields:
78 type: remap
79 inputs:
80 - final_task_log
81 - filter_common_logs
82 - filter_gitsyncrelay_logs
83 source: |
84 del(.host)
85 del(.file)
86# Configuration for ElasticSearch sink.
87sinks:
88 out:
89 type: elasticsearch
90 # Specify the transforms you want to run before your logs are exported.
91 inputs:
92 - transform_remove_fields
93 mode: bulk
94 compression: none
95 endpoint: "http://example-host:<example-port>"
96 auth:
97 strategy: "basic"
98 user: "example-user"
99 password : "example-pass"
100 bulk:
101 index: "vector.${RELEASE:--}.%Y.%m.%d"
102 action: create
  1. Run the following command to add the configuration file to your cluster as a Kubernetes secret:

    1kubectl create secret generic sidecar-config --from-file=vector-values.yaml=vector-values.yaml
  2. Run the following command to annotate the secret so that it’s automatically applied to all new Deployments:

    1kubectl annotate secret secret-name astronomer.io/commander-sync="platform-release=astronomer"
  3. Run the following command to sync existing Deployments with the new configuration:

    1kubectl create job --from=cronjob/astronomer-config-syncer sync-secrets -n astronomer

Use an external Elasticsearch instance for Airflow task log management

Add Airflow task logs from your Astronomer Deployment to an existing Elasticsearch instance on Elastic Cloud to centralize log management and analysis. Centralized log management allows you to quickly identify, troubleshoot, and resolve task failure issues. Although these examples use Elastic Cloud, you can also use AWS Managed OpenSearch Service or any other elastic service (managed or hosted). With an external Elasticsearch instance configured for Astro Private Cloud, you can see the logs in your Elasticsearch instance and browse the logs from the Astro Private Cloud UI.

If you use an existing Elasticsearch instance, make sure that the index template is configured to enable auto creation of new indices.

Create an Elastic Deployment and endpoint

  1. In your browser, go to https://cloud.elastic.co/ and create a new Elastic Cloud deployment. See Create a deployment.
  2. Copy and save your Elastic Cloud deployment credentials when the Save the deployment credentials screen appears.
  3. On the Elastic dashboard, click the Gear icon for your Deployment.
  1. Click Copy endpoint next to Elasticsearch.

  2. (Optional) Test the Elastic Cloud deployment endpoint:

    • Open a new browser window, paste the endpoint you copied in step 4 in the Address bar, and then press Enter.
    • Enter the username and password you copied in step 2 and click Sign in. Output similar to the following appears:
    name "instance-0000000000"
    cluster_name "<cluster-name>"
    cluster_uuid "<cluster-uuid>"
    version
    number "8.3.2"
    build_type "docker"
    build_hash "8b0b1f23fbebecc3c88e4464319dea8989f374fd"
    build_date "2022-07-06T15:15:15.901688194Z"
    build_snapshot false
    lucene_version "9.2.0"
    minimum_wire_compatibility_version "7.17.0"
    minimum_index_compatibility_version "7.0.0"
    tagline "You Know, for Search"

Save your Elastic Cloud deployment credentials

After you’ve created an Elastic deployment and endpoint, you have two options to store your Elastic deployment credentials. You can store the credentials in your Astro Private Cloud helm values, or for greater security, as a secret in your Astro Private Cloud Kubernetes cluster. For additional information about adding an Astro Private Cloud configuration change, see Apply a config change.

  1. Run the following command to base64 encode your Elastic Cloud deployment credentials:
$ echo -n "<username>:<password>" | base64
  1. Add the following entry to your values.yaml file:
1global:
2 vectorEnabled: true
3 customLogging:
4 enabled: true
5 scheme: https
6 # host endpoint copied from elasticsearch console with https
7 # and port number removed.
8 host: "<host-URL>"
9 port: "9243"
10 # encoded credentials from above step 1
11 secret: "<encoded credentials>"
  1. Add the following entry to your values.yaml file to disable internal logging:
1tags:
2 logging: false
  1. Run the following command to upgrade the Astro Private Cloud release version in the values.yaml file:
$helm upgrade -f values.yaml --version=0.27 --namespace=<your-platform-namespace> <your-platform-release-name> astronomer/astronomer

View Airflow task logs in Elastic

  1. On the Elastic dashboard in the Elasticsearch Service area, click the Deployment name.
  1. Click Menu > Discover. The Create index pattern screen appears.

  2. Enter vector.* if you use Vector Sidecar Logging. In the Name field, enter @timestamp in the Timestamp field, and then click Create index pattern.

  3. Click Menu > Dashboard to view all of the Airflow task logs for your Deployment on Astronomer.