Fluentd â Log Collection & Aggregation Guide
In this tutorial, you'll learn about Fluentd. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.
Fluentd is an open-source data collector for unified logging that lets you collect logs from hundreds of sources, Process them in real time, and forward them to multiple destinations (Elasticsearch, S3, Kafka, Grafana Loki) through a plugin ecosystem.
What You'll Learn
Why It Matters
Applications and infrastructure produce logs in different formats, locations, and cadences. Without a unified logging layer, you must manually configure each log source to send to each destination. Fluentd provides a single configuration file that collects all logs, parses them into a structured format, and routes them to any number of outputs. DodaTech processes 5TB of logs daily through Fluentd, routing application logs to Elasticsearch, infrastructure logs to S3, and security audit logs to a dedicated SIEM.
Real-World Use
DodaZIP's Kubernetes cluster runs Fluentd as a DaemonSet on every node. It collects container stdout/stderr logs, Kubernetes events, and node system logs, adds Kubernetes metadata (namespace, pod name, container name), and forwards structured JSON logs to Elasticsearch for Kibana visualization and to S3 for long-term archival.
flowchart TD
A[Log Sources] --> B[Fluentd Agent]
C[Docker Containers] --> B
D[System Logs] --> B
E[Application Files] --> B
B --> F[Input Plugin]
F --> G[Parser Plugin]
G --> H[Filter Plugin]
H --> I[Output Plugin]
I --> J[Elasticsearch]
I --> K[S3]
I --> L[Kafka]
I --> M[Loki]
B --> N[Buffer Plugin]
N --> I
style B fill:#0E5C8A,color:#fff
Prerequisites: Basic Linux administration. Understanding of logging patterns and JSON format.
Installation
# Install Fluentd via Ruby gem
gem install fluentd --no-doc
# Or via td-agent (Treasure Agent - packaged Fluentd)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-noble-td-agent4.sh | sh
# Verify
fluentd --version
# Expected output:
# fluentd 1.17.0
# Create a minimal config and test
fluentd -c /etc/fluentd/fluentd.conf --dry-run
# Expected output:
# 2026-06-24 10:00:00 +0000 [info]: parsing config file
# 2026-06-24 10:00:00 +0000 [info]: starting fluentd-1.17.0
# 2026-06-24 10:00:00 +0000 [info]: Dry run mode: OK
Basic Configuration
# /etc/fluentd/fluentd.conf
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluentd/app.pos
tag app.*
<parse>
@type json
</parse>
</source>
<source>
@type tail
path /var/log/syslog
pos_file /var/log/fluentd/syslog.pos
tag system.syslog
<parse>
@type syslog
</parse>
</source>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<filter app.**>
@type record_transformer
<record>
hostname ${hostname}
environment ${ENV_NODE_ENV:-production}
</record>
</filter>
<filter app.**>
@type grep
<exclude>
key level
pattern ^debug$
</exclude>
</filter>
<match app.**>
@type copy
<store>
@type elasticsearch
host elasticsearch.dodatech.com
port 9200
logstash_format true
logstash_prefix dodazip-logs
<buffer>
@type file
path /var/log/fluentd/buffer/elasticsearch
flush_interval 5s
chunk_limit_size 8m
retry_max_times 5
</buffer>
</store>
<store>
@type s3
s3_bucket dodatech-logs
s3_region us-east-1
path logs/${tag}/%Y/%m/%d/
<buffer>
@type file
path /var/log/fluentd/buffer/s3
flush_interval 60s
chunk_limit_size 256m
</buffer>
<format>
@type json
</format>
</store>
</match>
<match system.**>
@type elasticsearch
host elasticsearch.dodatech.com
port 9200
logstash_format true
logstash_prefix system-logs
flush_interval 10s
</match>
<match **>
@type stdout
</match>
Input Plugins
# Tail input (for log files)
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx-access.pos
tag nginx.access
<parse>
@type nginx
</parse>
</source>
# HTTP input (for application log shipping)
<source>
@type http
port 9880
bind 0.0.0.0
<parse>
@type json
</parse>
</source>
# Syslog input
<source>
@type syslog
port 5140
bind 0.0.0.0
tag system
</source>
# Windows Event Log
<source>
@type windows_eventlog
channels Application,System,Security
tag windows.eventlog
read_interval 2
</source>
Parsers
# JSON parser
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
# Regex parser for custom formats
<parse>
@type regexp
expression /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.+)$/
time_format %Y-%m-%d %H:%M:%S
</parse>
# Apache/NGINX parser
<parse>
@type apache2
</parse>
# CSV parser
<parse>
@type csv
keys col1,col2,col3
delimiter ,
</parse>
# Multi-line parser (for stack traces)
<parse>
@type multiline
format_firstline /^\d{4}-\d{2}-\d{2}/
format1 /^(?<time>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>\w+)\] (?<message>.+)/
format2 /^\s+(?<stack_trace>.+)/
</parse>
Filters
# Record enrichment
<filter **>
@type record_transformer
enable_ruby
<record>
hostname ${Socket.gethostname}
tag ${tag}
@timestamp ${time.iso8601}
</record>
</filter>
# Drop debug logs
<filter **>
@type grep
<exclude>
key level
pattern ^debug$
</exclude>
</filter>
# GeoIP enrichment
<filter nginx.access>
@type geoip
geoip_database /etc/fluentd/GeoLite2-City.mmdb
<record>
city ${geoip.city.names.en["nginx.access"]}
country ${geoip.country.iso_code["nginx.access"]}
</record>
skip_adding_null_record true
</filter>
Output and Buffering
# Elasticsearch output with buffer
<match app.**>
@type elasticsearch
host elasticsearch.dodatech.com
port 9200
user ${ENV_ES_USER}
password ${ENV_ES_PASSWORD}
scheme https
<buffer>
@type file
path /var/log/fluentd/buffer/es
flush_mode interval
flush_interval 5s
flush_thread_count 4
chunk_limit_size 8m
chunk_limit_records 5000
total_limit_size 10g
queue_limit_length 32
retry_max_interval 30
retry_forever false
retry_max_times 10
overflow_action block
</buffer>
</match>
# S3 output with compression
<match archive.**>
@type s3
s3_bucket dodatech-logs-archive
s3_region us-east-1
path ${tag[1]}/%Y/%m/%d/
<buffer>
@type file
path /var/log/fluentd/buffer/s3
flush_interval 10m
chunk_limit_size 256m
</buffer>
<format>
@type json
compression gzip
</format>
</match>
Kubernetes DaemonSet Deployment
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.17-debian-elasticsearch
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.logging"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_BUFFER_SIZE
value: "8m"
resources:
limits:
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: dockerlog
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluentd-config
mountPath: /fluentd/etc
- name: buffer
mountPath: /var/log/fluentd/buffer
volumes:
- name: varlog
hostPath:
path: /var/log
- name: dockerlog
hostPath:
path: /var/lib/docker/containers
- name: fluentd-config
configMap:
name: fluentd-config
- name: buffer
hostPath:
path: /var/log/fluentd/buffer
type: DirectoryOrCreate
Common Configuration Mistakes
Not setting
pos_filefortailinputs: Without a position file, Fluentd re-reads all logs on restart, causing duplicate entries. Always setpos_fileto a persistent path.Missing buffer configuration for outputs: Without buffers, a network failure causes data loss. File buffers (
@type file) provide durability and retry capability.Incorrect time format parsing: Logs with non-standard timestamps fail to parse. Test time formats with
fluentd -c config.conf --dry-runand check for time parsing errors.Single-threaded Fluentd for high-throughput: Default configuration processes logs sequentially. Increase
flush_thread_countand useworkers Nat startup for parallel processing.Not filtering debug logs before Elasticsearch: Debug-level logs can overwhelm Elasticsearch storage. Use a
grepfilter to exclude debug levels before the output match.
Practice Questions
What is a Fluentd plugin? Answer: Plugins extend Fluentd functionality â input plugins (tail, http, syslog), parser plugins (json, regex, apache2), filter plugins (record_transformer, grep), and output plugins (elasticsearch, s3, kafka).
How does Fluentd ensure reliable log delivery? Answer: File buffers store logs on disk before forwarding. If the destination is unreachable, Fluentd retries with exponential backoff. Once delivered, the buffered chunks are deleted.
What is the purpose of
pos_file? Answer: The position file tracks how much of each log file Fluentd has read. On restart, Fluentd resumes from the last position, preventing duplicate or missed log entries.How do you parse multi-line logs (stack traces) in Fluentd? Answer: Use the
multilineparser withformat_firstlineto identify the first line of each multi-line block, andformat1,format2for subsequent lines.
Challenge
Set up a complete Fluentd logging pipeline: install Fluentd on a server, configure tail inputs for NGINX access logs and application JSON logs, parse both formats correctly, add a record_transformer filter to add hostname and environment, route application logs to Elasticsearch with a file buffer, route system logs to S3 with gzip compression, filter out debug-level logs, configure forward input to receive logs from remote servers, and deploy as a Kubernetes DaemonSet.
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro