Log manipulation and alteration with Logstash mutate filter

Written by - 0 comments

Published on - Listed in ELK Observability Logstash Monitoring


In a large ELK observability setup with roughly 250-300 GB logs per day, parts of the logs should be sent to an external service for data analysis. But to comply with data privacy, some parts of the log events must be removed. At the same time the external service also required a specific name of fields - which differed from the ELK logs.

Luckily there's the Logstash mutate filter, which allows to modify and alter log events - until everyone's happy. But first, let's start at the beginning.

The ELK stack

The use of a so-called ELK (Elasticsearch + Logstash + Kibana) is nothing new in the observability world. The following simplified architecture might be known to many already.

A very important additional factor in this drawing is the locally installed Filebeat agent, which is told use the nginx module to "watch" on the logs located in /var/log/nginx/. With every HTTP request the relevant access logs are appended, read by Filebeat and sent to the central Logstash instance.

Logstash itself might do some additional log parsing (in our case applying a grok filter to identify and match the Nginx logs) until the finalized event is sent to (and stored in) a large Elasticsearch cluster.

This allows you to create some nice and fancy dashboards, representing the visits of your websites in real-time.

Nginx web server observability dashboard in Kibana

Nginx access logs collected by Filebeat

When the Filebeat collected access logs land in the Elasticsearch cluster, an event contains a lot of data. Not only data representing the entry in the Nginx access log, but also additional meta data. Here's an example:

The metadata contains information about the Filebeat agent itself (fields with agent suffix), which log file was parsed and more.

There's also the field "clientip" which we don't want to ship to the external analysis provider - obviously for privacy reasons.

In order to get rid of all the unnecessary data, we can use the mutate filter inside the Logstash configuration.

Log event manipulation with mutate filter

From the external analysis provider we received the following mandatory fields:

  • host
  • url
  • request_user_agent
  • response_status

However the current fields received by Logstash are named differently. Furthermore the "host" field is added by Filebeat to describe the host (web server) from which the log file originated. And, as mentioned before, we need to get rid of the "clientip" field for privacy reasons.

In order to achieve this, the log event needs to be changed.

The Logstash mutate filter is a powerful filter to manipulate log events. Each field (and the corresponding value) can be handled, additional fields can be added, certain fields removed and more.

In the following example I chose to apply the mutate filter only to events which were collected from a specific Nginx access log:

filter {
[...]
  if [log][file][path] == "/var/log/nginx/www.example.com.access.log" {
    mutate {
      rename => {
        "useragent" => "request_user_agent"
        "response" => "response_status"
        "bytes" => "response_body_size"
        "verb" => "request_method"
        "countrycode" => "geo_country"
        "host" => "webserver"
      }
      add_field => {
        "[@metadata][target]" => "externalanalysis"
        "host" => "www.example.com"
        "url" => "https://%{host}%{request}"
      }
      remove_field => [ "@timestamp", "@version", "tags", "type", "agent", "log", "ecs", "input", "error", "auth", "ident", "request", "referrer", "httpversion", "tlsversion", "tlscipher", "clientip", "message", "webserver" ]
    }
  }
[...]
}

Here are some details:

  • if [log][file][path] condition => This is an if condition to tell Logstash to only apply the following mutate filter on events matching the condition. In this case this means only apply the mutate filter on events where the log.file.path field has the value "/var/log/nginx/www.example.com.access.log".
  • rename => Rename a field to a new name. In this example the field "useragent" will be renamed to "request_user_agent" (as requested by the external analysis provider). Note that the original "host" field was renamed to "webserver".
  • add_field => Add additional fields with a specific value to the log event. To use the value of existing fields, variables referring to other fields can be used (e.g. %{host}).
  • remove_field => Remove fields from the log event. In this example almost all fields, including meta data fields, are removed from the log event.
Important note: The mutate filter only works within the filter {} context!

Data verification with local log file

Before shipping the manipulated log events from that specific access log to the external analysis provider, let's verify that the data looks correct. The easiest way to achieve this is to use a local log file as Logstash output.

As the mutate filter above added a new meta data field ([@metadata][target]), we can create another if condition within the output {} context to specify a local log file as output:

output {
[...]
  if [@metadata][target] == "externalanalysis" {
    file { path => "/tmp/nginx.log" }
  }
[...]
}

After a Logstash restart, the new log file /tmp/nginx.log is being filled with events in JSON format.

Thanks to the mutate filter the log events contain much less data than the original (Filebeat) log events:

root@elk:~# tail -n 1 /tmp/nginx.log  | jq
{
  "host": "www.example.com",
  "request_method": "GET",
  "response_status": "200",
  "url": "https://www.example.com/",
  "request_user_agent": "\"Mozilla/5.0 (X11; Linux x86_64; rv:133.0) Gecko/20100101 Firefox/133.0\"",
  "response_body_size": "298592",
  "timestamp": "30/Dec/2024:15:09:11 +0100",
  "geo_country": "CH"
}

As the output shows, the data fields are as requested by the external analysis provider and unnecessary data is removed from the log event.

The final step was to send these log events to the external provider's API, using Logstash's http output plugin.


Add a comment

Show form to leave a comment

Comments (newest first)

No comments yet.

RSS feed

Blog Tags:

  AWS   Android   Ansible   Apache   Apple   Atlassian   BSD   Backup   Bash   Bluecoat   CMS   Chef   Cloud   Coding   Consul   Containers   CouchDB   DB   DNS   Database   Databases   Docker   ELK   Elasticsearch   Filebeat   FreeBSD   Galera   Git   GlusterFS   Grafana   Graphics   HAProxy   HTML   Hacks   Hardware   Icinga   Influx   Internet   Java   KVM   Kibana   Kodi   Kubernetes   LVM   LXC   Linux   Logstash   Mac   Macintosh   Mail   MariaDB   Minio   MongoDB   Monitoring   Multimedia   MySQL   NFS   Nagios   Network   Nginx   OSSEC   OTRS   Observability   Office   OpenSearch   PGSQL   PHP   Perl   Personal   PostgreSQL   Postgres   PowerDNS   Proxmox   Proxy   Python   Rancher   Rant   Redis   Roundcube   SSL   Samba   Seafile   Security   Shell   SmartOS   Solaris   Surveillance   Systemd   TLS   Tomcat   Ubuntu   Unix   VMWare   VMware   Varnish   Virtualization   Windows   Wireless   Wordpress   Wyse   ZFS   Zoneminder