Skip to content

Fluentd — Log Collection & Aggregation Guide

DodaTech Updated 2026-06-24 6 min read

In this tutorial, you'll learn about Fluentd. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.

Fluentd is an open-source data collector for unified logging that lets you collect logs from hundreds of sources, Process them in real time, and forward them to multiple destinations (Elasticsearch, S3, Kafka, Grafana Loki) through a plugin ecosystem.

What You'll Learn

Why It Matters

Applications and infrastructure produce logs in different formats, locations, and cadences. Without a unified logging layer, you must manually configure each log source to send to each destination. Fluentd provides a single configuration file that collects all logs, parses them into a structured format, and routes them to any number of outputs. DodaTech processes 5TB of logs daily through Fluentd, routing application logs to Elasticsearch, infrastructure logs to S3, and security audit logs to a dedicated SIEM.

Real-World Use

DodaZIP's Kubernetes cluster runs Fluentd as a DaemonSet on every node. It collects container stdout/stderr logs, Kubernetes events, and node system logs, adds Kubernetes metadata (namespace, pod name, container name), and forwards structured JSON logs to Elasticsearch for Kibana visualization and to S3 for long-term archival.

flowchart TD
    A[Log Sources] --> B[Fluentd Agent]
    C[Docker Containers] --> B
    D[System Logs] --> B
    E[Application Files] --> B
    B --> F[Input Plugin]
    F --> G[Parser Plugin]
    G --> H[Filter Plugin]
    H --> I[Output Plugin]
    I --> J[Elasticsearch]
    I --> K[S3]
    I --> L[Kafka]
    I --> M[Loki]
    B --> N[Buffer Plugin]
    N --> I
    style B fill:#0E5C8A,color:#fff
â„šī¸ Info

Prerequisites: Basic Linux administration. Understanding of logging patterns and JSON format.

Installation

# Install Fluentd via Ruby gem
gem install fluentd --no-doc

# Or via td-agent (Treasure Agent - packaged Fluentd)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-noble-td-agent4.sh | sh

# Verify
fluentd --version

# Expected output:
# fluentd 1.17.0

# Create a minimal config and test
fluentd -c /etc/fluentd/fluentd.conf --dry-run

# Expected output:
# 2026-06-24 10:00:00 +0000 [info]: parsing config file
# 2026-06-24 10:00:00 +0000 [info]: starting fluentd-1.17.0
# 2026-06-24 10:00:00 +0000 [info]: Dry run mode: OK

Basic Configuration

# /etc/fluentd/fluentd.conf
<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/fluentd/app.pos
  tag app.*
  <parse>
    @type json
  </parse>
</source>

<source>
  @type tail
  path /var/log/syslog
  pos_file /var/log/fluentd/syslog.pos
  tag system.syslog
  <parse>
    @type syslog
  </parse>
</source>

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<filter app.**>
  @type record_transformer
  <record>
    hostname ${hostname}
    environment ${ENV_NODE_ENV:-production}
  </record>
</filter>

<filter app.**>
  @type grep
  <exclude>
    key level
    pattern ^debug$
  </exclude>
</filter>

<match app.**>
  @type copy
  <store>
    @type elasticsearch
    host elasticsearch.dodatech.com
    port 9200
    logstash_format true
    logstash_prefix dodazip-logs
    <buffer>
      @type file
      path /var/log/fluentd/buffer/elasticsearch
      flush_interval 5s
      chunk_limit_size 8m
      retry_max_times 5
    </buffer>
  </store>
  <store>
    @type s3
    s3_bucket dodatech-logs
    s3_region us-east-1
    path logs/${tag}/%Y/%m/%d/
    <buffer>
      @type file
      path /var/log/fluentd/buffer/s3
      flush_interval 60s
      chunk_limit_size 256m
    </buffer>
    <format>
      @type json
    </format>
  </store>
</match>

<match system.**>
  @type elasticsearch
  host elasticsearch.dodatech.com
  port 9200
  logstash_format true
  logstash_prefix system-logs
  flush_interval 10s
</match>

<match **>
  @type stdout
</match>

Input Plugins

# Tail input (for log files)
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd/nginx-access.pos
  tag nginx.access
  <parse>
    @type nginx
  </parse>
</source>

# HTTP input (for application log shipping)
<source>
  @type http
  port 9880
  bind 0.0.0.0
  <parse>
    @type json
  </parse>
</source>

# Syslog input
<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  tag system
</source>

# Windows Event Log
<source>
  @type windows_eventlog
  channels Application,System,Security
  tag windows.eventlog
  read_interval 2
</source>

Parsers

# JSON parser
<parse>
  @type json
  time_key timestamp
  time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>

# Regex parser for custom formats
<parse>
  @type regexp
  expression /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.+)$/
  time_format %Y-%m-%d %H:%M:%S
</parse>

# Apache/NGINX parser
<parse>
  @type apache2
</parse>

# CSV parser
<parse>
  @type csv
  keys col1,col2,col3
  delimiter ,
</parse>

# Multi-line parser (for stack traces)
<parse>
  @type multiline
  format_firstline /^\d{4}-\d{2}-\d{2}/
  format1 /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.+)/
  format2 /^\s+(?<stack_trace>.+)/
</parse>

Filters

# Record enrichment
<filter **>
  @type record_transformer
  enable_ruby
  <record>
    hostname ${Socket.gethostname}
    tag ${tag}
    @timestamp ${time.iso8601}
  </record>
</filter>

# Drop debug logs
<filter **>
  @type grep
  <exclude>
    key level
    pattern ^debug$
  </exclude>
</filter>

# GeoIP enrichment
<filter nginx.access>
  @type geoip
  geoip_database /etc/fluentd/GeoLite2-City.mmdb
  <record>
    city ${geoip.city.names.en["nginx.access"]}
    country ${geoip.country.iso_code["nginx.access"]}
  </record>
  skip_adding_null_record true
</filter>

Output and Buffering

# Elasticsearch output with buffer
<match app.**>
  @type elasticsearch
  host elasticsearch.dodatech.com
  port 9200
  user ${ENV_ES_USER}
  password ${ENV_ES_PASSWORD}
  scheme https

  <buffer>
    @type file
    path /var/log/fluentd/buffer/es
    flush_mode interval
    flush_interval 5s
    flush_thread_count 4
    chunk_limit_size 8m
    chunk_limit_records 5000
    total_limit_size 10g
    queue_limit_length 32
    retry_max_interval 30
    retry_forever false
    retry_max_times 10
    overflow_action block
  </buffer>
</match>

# S3 output with compression
<match archive.**>
  @type s3
  s3_bucket dodatech-logs-archive
  s3_region us-east-1
  path ${tag[1]}/%Y/%m/%d/
  <buffer>
    @type file
    path /var/log/fluentd/buffer/s3
    flush_interval 10m
    chunk_limit_size 256m
  </buffer>
  <format>
    @type json
    compression gzip
  </format>
</match>

Kubernetes DaemonSet Deployment

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: logging
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccountName: fluentd
      containers:
        - name: fluentd
          image: fluent/fluentd-kubernetes-daemonset:v1.17-debian-elasticsearch
          env:
            - name: FLUENT_ELASTICSEARCH_HOST
              value: "elasticsearch.logging"
            - name: FLUENT_ELASTICSEARCH_PORT
              value: "9200"
            - name: FLUENT_ELASTICSEARCH_BUFFER_SIZE
              value: "8m"
          resources:
            limits:
              memory: 500Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: dockerlog
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: fluentd-config
              mountPath: /fluentd/etc
            - name: buffer
              mountPath: /var/log/fluentd/buffer
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: dockerlog
          hostPath:
            path: /var/lib/docker/containers
        - name: fluentd-config
          configMap:
            name: fluentd-config
        - name: buffer
          hostPath:
            path: /var/log/fluentd/buffer
            type: DirectoryOrCreate

Common Configuration Mistakes

  1. Not setting pos_file for tail inputs: Without a position file, Fluentd re-reads all logs on restart, causing duplicate entries. Always set pos_file to a persistent path.

  2. Missing buffer configuration for outputs: Without buffers, a network failure causes data loss. File buffers (@type file) provide durability and retry capability.

  3. Incorrect time format parsing: Logs with non-standard timestamps fail to parse. Test time formats with fluentd -c config.conf --dry-run and check for time parsing errors.

  4. Single-threaded Fluentd for high-throughput: Default configuration processes logs sequentially. Increase flush_thread_count and use workers N at startup for parallel processing.

  5. Not filtering debug logs before Elasticsearch: Debug-level logs can overwhelm Elasticsearch storage. Use a grep filter to exclude debug levels before the output match.

Practice Questions

  1. What is a Fluentd plugin? Answer: Plugins extend Fluentd functionality — input plugins (tail, http, syslog), parser plugins (json, regex, apache2), filter plugins (record_transformer, grep), and output plugins (elasticsearch, s3, kafka).

  2. How does Fluentd ensure reliable log delivery? Answer: File buffers store logs on disk before forwarding. If the destination is unreachable, Fluentd retries with exponential backoff. Once delivered, the buffered chunks are deleted.

  3. What is the purpose of pos_file? Answer: The position file tracks how much of each log file Fluentd has read. On restart, Fluentd resumes from the last position, preventing duplicate or missed log entries.

  4. How do you parse multi-line logs (stack traces) in Fluentd? Answer: Use the multiline parser with format_firstline to identify the first line of each multi-line block, and format1, format2 for subsequent lines.

Challenge

Set up a complete Fluentd logging pipeline: install Fluentd on a server, configure tail inputs for NGINX access logs and application JSON logs, parse both formats correctly, add a record_transformer filter to add hostname and environment, route application logs to Elasticsearch with a file buffer, route system logs to S3 with gzip compression, filter out debug-level logs, configure forward input to receive logs from remote servers, and deploy as a Kubernetes DaemonSet.

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro