Table of Contents
- Plugin Documentation
- Sources
- Plugin: tail-file
- Plugin: read-file
- Plugin: read-http
- Plugin: read-journal
- Plugin: read-kafka
- Plugin: read-stdin
- Plugin: query-elasticsearch
- Sinks
- Observability
- Parsing & Serialization
- Plugin: parse-json
- Plugin: parse-lines
- Plugin: parse-yaml
- Plugin: serialize-json
- Plugin: serialize-yaml
- Miscellaneous
Plugin Documentation
Sources
Plugin: tail-file
This plugin watches files for changes and emits new records. Tracks file positions and inodes in a state file to handle rotations. Re-evaluates globs periodically to discover new files. Supports starting from beginning or end of files.
Config
Required:
globs(string[]): Array of glob patterns for files to tail
Optional:
separator(string): Record delimiter (default:\n)encoding(string): File encoding (default: utf8; options: utf8, ascii, hex, base64, binary)trim(boolean): Strip whitespace from recordsdb(string): State file path (default:{STAGE}.taildb)intervalSeconds(number): Glob re-evaluation interval (default: 60)tailInterval(number): File check interval in millisecondsfromStart(boolean): Read new files from beginning instead of end
Example
tail-file:
config:
globs:
- /var/log/app/*.log
separator: "\n"
encoding: utf8
db: ./.logbus-state.json
intervalSeconds: 60
fromStart: true
Plugin: read-file
This plugin reads files at startup, splitting content by separator and emitting records. Supports glob patterns, multiple encodings, and throttled reading. Pipeline shuts down after all files are consumed. For continuous file monitoring, use tail-file instead.
Config
Required:
globs(string[]): Array of glob patterns for files to read
Optional:
separator(string): Record delimiter (default:\n)encoding(string): File encoding (default: utf8; options: utf8, ascii, hex, base64)trim(boolean): Strip whitespace from recordsmaxMbps(number): Throttle rate in MB/s (default: 100)
Example
read-file:
config:
globs:
- /var/log/app/*.log
- /var/log/system.log
separator: "\n"
encoding: utf8
trim: true
maxMbps: 50
Plugin: read-http
This plugin creates an HTTP server that listens for requests and emits them as events. Parses JSON bodies automatically based on Content-Type header. Each request is emitted with method, headers, query parameters, and body.
Config
Optional:
port(number): Port to listen on (default: 8080)host(string): Host to bind to (default: 0.0.0.0)
Example
read-http:
config:
port: 3000
host: localhost
Plugin: read-journal
This plugin queries the systemd journal at intervals and emits entries as events. Tracks last cursor/timestamp in a state file to avoid re-processing. Supports starting from a relative time on first run.
Config
Optional:
journalctl(string): Path to journalctl binary (default: journalctl)intervalSeconds(number): Query interval (default: 60)db(string): State file path (default: logbus-journal-db)sinceMinutes(number): Look back this many minutes on first run
Example
read-journal:
config:
sinceMinutes: 2880 # 48 hours
intervalSeconds: 10
db: .logbus-state.json
Plugin: read-kafka
This plugin connects to Kafka brokers and consumes messages from a specified topic. Messages are emitted as events with the value in the payload field. Supports consumer groups and optional max message limits.
Config
Required:
brokers(string[]): Array of Kafka broker addressestopic(string): Topic to consume fromgroup(string): Consumer group ID
Optional:
client(string): Kafka client ID (default: logbus)max(number): Maximum messages to consume before stoppingreplay(boolean): Start from beginning of topic
Example
read-kafka:
config:
brokers:
- kafka1:9092
- kafka2:9092
topic: logs
group: logbus-consumer
client: logbus-instance-1
Plugin: read-stdin
This plugin reads from stdin, splits input by a matcher pattern, and emits each line as an event. Supports throttling and custom encodings. By default, shuts down the pipeline when stdin closes.
Config
Optional:
encoding(string): Input encoding (default: utf8; options: utf8, ascii, hex, base64, binary)matcher(string | regex): Line separator pattern (default:\r?\n)maxMbps(number): Throttle rate in MB/s (default: 100)stopOnEOF(boolean): Shutdown pipeline on EOF (default: true)
Example
read-stdin:
config:
encoding: utf8
matcher: "\n"
maxMbps: 50
stopOnEOF: false
Plugin: query-elasticsearch
This plugin executes a search against Elasticsearch and emits each hit as an event. Uses the scroll API to paginate through results. Supports dynamic index patterns and custom search bodies. Pipeline shuts down when results are exhausted or max hits reached.
Config
Required:
index(string | function): Index pattern to searchsearch(object): Search body (Elasticsearch query DSL)
Optional:
endpoint(string | function): Elasticsearch endpoint (default: http://localhost:9200)scroll(string): Scroll context timeout (default: 1m)max(number): Maximum hits to process (default: 0 = unlimited)ssl.ca(string): Path to CA certificate file
Example
query-elasticsearch:
config:
index: logs-*
scroll: 1m
search:
size: 333
query:
range:
timestamp:
gte: now-1d
endpoint: http://localhost:9200
ssl:
ca: ~/.certs/ca.pem
Sinks
Plugin: index-elasticsearch
This plugin buffers events and ships them to Elasticsearch using the /_bulk endpoint. Shipping occurs when the buffer reaches a size limit or at regular intervals. Supports custom SSL certificates and dynamic endpoint configuration.
Config
Optional:
endpoint(string | function): Elasticsearch endpoint (default: http://localhost:9200)index(string): Default index name (default: logbus)bufferSize(number): Events to buffer before shipping (default: 1000)intervalSeconds(number): Interval between shipments (default: 60)ssl.ca(string): Path to CA certificate file
Event fields _index, _id, and _type are used for document metadata if present.
Example
NOTE: This plugin will emit the response from Elasticsearch bulk index request as downstream events, so it is important to declare it as an output stage with outChannels: [].
index-elasticsearch:
outChannels: []
config:
intervalSeconds: 10
bufferSize: 1000
endpoint: http://localhost:9200
# endpoint: !!js/function >-
# function(event) {
# return `https://${process.env.ES_USER}:${process.env.ES_PASS}@your-site`
# }
ssl:
ca: ~/.certs/ca.pem
Plugin: write-file
This plugin writes the payload field of events to a file. Supports multiple encodings and can either append or overwrite. Does not propagate events downstream (sets outChannels to empty array).
Config
Required:
path(string): File path (supports ~ expansion)
Optional:
encoding(string): Output encoding (default: utf8; options: utf8, ascii, hex, base64, binary)append(boolean): Append to file instead of overwriting
Example
write-file:
config:
path: ~/logs/output.log
encoding: utf8
append: true
Plugin: write-http
This plugin posts event payloads to an HTTP/HTTPS URL. Supports custom headers and methods. Waits for all inflight requests before shutdown.
Config
Required:
url(string): Target HTTP endpoint
Optional:
method(string): HTTP method (default: POST; options: GET, POST, DELETE, PUT, PATCH)headers
Plugin: send-email
This plugin sends events as email messages. The event payload becomes the email body. Supports authenticated and unauthenticated SMTP. Waits for all inflight emails before shutdown.
Config
Required:
to(string): Recipient email addressfrom(string): Sender email addresssubject(string): Email subjecthost(string): SMTP server hostname
Optional:
port(number): SMTP port (default: 587)user(string): SMTP usernamepassword(string): SMTP password
Example
send-email:
config:
to: alerts@example.com
from: logbus@example.com
subject: Alert from LogBus
host: smtp.example.com
port: 587
user: logbus
password: secret
Observability
Plugin: log
This plugin writes events to the LogBus internal Bunyan logger. It supports standard log levels and can extract level information from event fields. Adds hostname if not present. This plugin sets outChannels to an empty array, preventing downstream propagation.
Config
Optional:
defaultLevel('trace' | 'debug' | 'info' | 'warn' | 'error' | 'fatal'): Default log level if not specified in event (default: info)extra(Record<string, unknown>): Additional metadata to attach to all log entries
The log level will attempt to be used from the event from these fields: level, severity. Those values will be mapped to {trace...fatal} like so:
- Bunyan format: 10-60 (trace through fatal)
- Syslog format: 1-7 (fatal through trace)
- Named: trace, debug, info, warn, warning, error, err, fatal
Example
log:
inChannels:
- log-errors
- log-stats
config:
defaultLevel: warn
extra:
env: prod
Plugin: errors
This plugin collects errors by stage and message, keeping only a sample of each unique error. Errors are aggregated and emitted periodically to minimize memory consumption and avoid flooding logs with duplicate error messages.
Config
Optional:
intervalSeconds(number): How often to emit aggregated errors (default: 60)stackDepth(number): Number of stack trace levels to retain (default: 3)
Example
NOTE: There are two reserved channel names: stats & errors, so it is important to use a pipeline stage name other than errors to avoid a clash.
log-errors:
module: errors
inChannels:
- errors
config:
intervalSeconds: 300
stackDepth: 3
Plugin: stats
This plugin collects metrics from other pipeline stages and emits aggregated statistics events. Tracks standard metrics (errors, events, bytes, lines) and supports custom metrics. Includes memory usage information.
Config
Optional:
intervalSeconds(number): Emit interval (default: 15)extra(string[]): Additional custom metric names to track
Standard Metrics:
- errors, rxEvents, txEvents, rxBytes, txBytes, rxLines, txLines
Example
NOTE: There are two reserved channel names: stats & errors, so it is important to use a pipeline stage name other than stats to avoid a clash.
log-stats:
module: stats
inChannels:
- stats
config:
intervalSeconds: 300
Plugin: sample
This plugin emits events periodically based on count (every nth event) or time (every N seconds), or both.
Config
Required (at least one):
nth(number): Emit every nth eventintervalSeconds(number): Emit every N seconds
Example
sample:
config:
nth: 100
Plugin: tap
This plugin gates event flow using a custom function that returns true/false. The function is evaluated at startup and periodically thereafter. Events only flow downstream when the tap is "on".
Config
Required:
tap(() => boolean | Promise): Function returning boolean to control event flow
Optional:
intervalSeconds(number): Re-evaluation interval (default: 60)
Example
tap:
config:
tap: !!js/function >-
function() {
const hour = new Date().getHours()
return hour >= 9 && hour <= 17
}
intervalSeconds: 300
Plugin: parse-otel
This plugin processes OTLP metrics from HTTP requests, supporting both JSON and Protobuf formats. Each datapoint is emitted as a separate LogBus event. Supports Gauge, Sum, Histogram, and Summary metric types.
Status: Work in Progress
Config
No configuration required.
Example
parse-otel:
config: {}
Parsing & Serialization
Plugin: parse-json
This plugin attempts to parse the payload field as JSON. On success, it emits the parsed object. On failure, it logs an error with surrounding text context to aid troubleshooting.
Config
No configuration required.
Example
parse-json:
config: {}
Plugin: parse-lines
This plugin splits incoming text by a separator (default newline) and emits each line as a separate event. Maintains a buffer for incomplete lines across chunks. Lines exceeding maxSize are truncated.
Config
Optional:
separator(string): Line separator (default:\n)maxSize(number): Maximum line length in bytes (default: 65536)
Example
parse-lines:
config:
separator: "\n"
maxSize: 1000
Plugin: parse-yaml
This plugin attempts to parse the payload field as YAML. On success, it emits the parsed object. On failure, it logs an error.
Config
No configuration required.
Example
parse-yaml:
config: {}
Plugin: serialize-json
This plugin converts events to JSON strings and places them in the payload field. Supports custom indentation and line delimiters.
Config
Optional:
indent(number): Spaces for indentation (default: 0 = compact)delimiter(string): String to append after JSON (default:\n)
Example
serialize-json:
config:
indent: 0
delimiter: ""
Plugin: serialize-yaml
This plugin converts upstream events to YAML and emits them downstream as {payload: yaml(event)}.
Config
No configuration required.
Example
serialize-yaml:
config: {}
Miscellaneous
Plugin: agg
This plugin collects events into buckets and emits aggregated events when conditions are met. It supports flexible grouping via key functions, filtering to select which events to aggregate, and custom view functions to transform buckets into output events. Buckets can be completed by size limits, time spans, or custom start/stop markers.
Config
Required:
filtered(function): Returns true for events that should NOT be aggregatedkey(function): Returns the grouping key for an eventview(function): Transforms a bucket array into a single aggregated event
Optional:
tsField(string): Timestamp field name, defaults totsstart(function): Returns true when an event should start a new bucketstop(function): Returns true when an event should complete a bucketmaxSize(number | function): Maximum events per bucket (default: 1000)maxEventSeconds(number | function): Maximum time span between first and last event in seconds (default: 300)maxRealSeconds(number | function): Interval for flushing all buckets in seconds (default: 300)
Functions receive a sandbox context with config, util, moment, and hostname.
Example
This example assumes that an upstream stage has marked events that should be aggregated with an _agg object.
count-aggregated:
module: agg
config:
maxSize: 1000
filtered: !!js/function >-
function(event) {
return !event._agg?.type?.startsWith('count')
}
start: !!js/function >-
function(event) {
return true // let size or time trigger aggregation
}
stop: !!js/function >-
function(event) {
return false // let size or time trigger aggregation
}
key: !!js/function >-
function(event) {
return event._agg.key
}
view: !!js/function >-
function(events) {
let event = events[0] // assuming grouped events are similar enough
event.end = events[events.length-1].ts
event.aggregation = 'count'
event.duration = event.end - event.ts // milliseconds
event.count = events.length
if (event.error) {
event.message = `${event.count}x: ${event.message}`
} else if (event._agg.type === 'count-process') {
event.message = events.map(i => i.message).join('\n')
} else {
// assume original message in aggregation key.
event.message = `${event.count}x: ${event._agg.key}`
}
delete event._agg
return event
}
Plugin: cast
This plugin provides a simple way to convert fields into some common types. If a field doesn't exist in the event, it's ignored. Null values are preserved as null.
Config
Required:
fields(object): A mapping of field names to their target types
Supported Types:
int- Converts to integerfloat- Converts to floating-point numberbool- Converts to booleants-sec- Converts seconds-since-epoch to moment timestampts-msec- Converts milliseconds-since-epoch to moment timestampts-usec- Converts microseconds-since-epoch to moment timestamp
Example
cast:
config:
fields:
response_time: float
status_code: int
success: bool
created_at: ts-sec
Plugin: drop
This plugin deletes configured fields from each log event that passes through it. Uses lodash's omit function to create a new event object without the specified fields.
Config
Required:
fields(string[]): Array of field names to remove from events
Example
drop:
config:
fields:
- _agg
- password
- large
- debug_info
Plugin: fail2ban
This plugin monitors events for failures using a custom failed function. When a key exceeds its failure limit within minutes time window, the ban function is called. Events pass through regardless of failure status. Provides access to ipset command for IP-based banning.
Config
Required:
failed((e: LogEvent) => Failure): Must return{key: string, ts: number, limit: number}where ts is minutes since epoch, or nullban((e: LogEvent, key: string) => boolean): Called when limit exceeded, must return true to mark as bannedminutes(number): Time window for failure counting
Functions receive sandbox context with logbus, config, moment, get, set, and ipset.
Example
fail2ban:
config:
failures: {} # to avoid banning the same ip multiple times
minutes: 5
failed: !!js/function >-
function(event) {
let key
let ts
let limit
if (event.ts && event.message) {
ts = event.ts.unix() / 60 // convert to minutes since epoch
let match
if (this.get(event, 'source.ip') && this.get(event, 'event.action') === 'dropped' && this.get(event, 'event.dataset') === 'iptables') {
// blacklist anything probing the firewall too much
key = `iptables~${event.source.ip}`
limit = 10
} else if (event.process === 'sshd') {
match = event.message.match(/invalid user (?<user>\S+) (?<ip>\S+) port (?<port>\d+)/ui)
if (match) {
const {user, ip, port} = match.groups
this.set(event, 'client.ip', ip)
this.set(event, 'client.port', port)
this.set(event, 'labels.user', user)
key = `ssh~${ip}`
limit = 1
}
} else if (this.get(event, 'event.category', []).includes('web') && this.get(event, 'event.type', []).includes('access')) {
// assuming web access log parsed by us
const path = this.get(event, 'url.path', '')
const status = this.get(event, 'http.response.status_code')
if (path.endsWith('.env') || path.includes('\\x')) {
key = `web~${event.client.ip}`
limit = 1
} else if (status >= 400 && status < 500) {
key = `web~${event.client.ip}`
limit = status !== 403 ? 5 : 10 // 5 failures is too easy to ban legit users
}
}
if (key) {
this.config.failures[key] = event
}
}
return {key, ts, limit}
}
ban: !!js/function >-
function ban(event, key) {
if (!this.config.failures[key]) {
return false
}
const [service, ip] = key.split('~')
this.logbus.stats({banned: 1})
this.ipset(['add', '-!', `blacklist-${service}`, ip], (err, stdout, stderr) => {
this.logbus.log.warn({ip, service}, 'banned', stderr, stdout)
})
event.tags.push('banned')
delete this.config.failures[key]
return true
}
Plugin: gc
This plugin triggers Node.js garbage collection periodically and emits an event with the duration of each collection. Requires Node.js to be started with --expose-gc flag.
Config
Optional:
intervalSeconds(number): Interval between garbage collections (default: 60)
Example
gc:
config:
intervalSeconds: 120
Plugin: geoip
This plugin queries a MaxMind GeoLite2 database to enrich events with geographic information. It extracts IP addresses using a custom ip function and enriches events using a custom enrich function. Events are always emitted downstream, whether enrichment succeeds or fails.
Config
Required:
path(string): Path to MaxMind GeoLite2 database fileip(function): Extracts IP address from eventenrich(function): Applies geodata to event
Optional:
test(function): Mock function for testing (bypasses database lookup)
Functions receive sandbox context with get and set from lodash.
Example
geoip:
config:
path: /path/to/GeoLite2-City.mmdb
ip: !!js/function >-
function(event) {
return this.get(event, 'client.ip') || this.get(event, 'source.ip') || event.ip
}
enrich: !!js/function >-
function(event, results) {
// https://www.elastic.co/guide/en/ecs/current/ecs-geo.html
const geo = {}
if (results.city) {
geo.city_name = results.city.names.en
}
if (results.continent) {
geo.continent_code = results.continent.code
}
if (results.country) {
geo.country_iso_code = results.country.isoCode
geo.country_name = results.country.names.en
}
if (results.postal) {
geo.postal_code = results.postal.code
}
if (results.location) {
geo.location = [results.location.longitude, results.location.latitude]
}
if (results.traits) {
event.labels = Object.assign({}, event.labels, geo.traits)
delete event.labels.ipAddress
delete event.labels.network
}
if (this.get(event, 'client.ip')) {
event.client.geo = geo
} else if (this.get(event, 'source.ip')) {
event.source.geo = geo
} else {
event.geo = geo
}
}
Plugin: js
This plugin runs a user-defined function on each event. If the function returns a truthy value, it's emitted downstream. Null or undefined results filter out the event. The function receives a sandbox context:
- config: a reference to this stage's config
- util: the nodejs
utilmodule - moment: the
momentlibrary - hostname: the
os.hostname()function
Config
Required:
function((e: LogEvent) => null | LogEvent): Transformation function to apply to events
Example
not-aggregated:
module: js
config:
function: !!js/function >-
function(event) {
if (event._agg === undefined) {
return event
}
}
Plugin: keep
This plugin creates new events containing only the configured fields. Each destination field can map to a single source field or an array of fallback sources (first defined value wins). Destination fields are always defined, set to null if no source has a value.
Config
Required:
fields(Record<string, string | string[]>): Mapping of destination fields to source field(s)
Example
keep:
config:
fields:
timestamp: [ts, timestamp, "@timestamp"]
message: [msg, message, text]
level: [severity, level]
Plugin: rename
This plugin renames fields in events based on a configured mapping. Source fields that don't exist are ignored. After renaming, source fields are deleted from the event.
Config
Required:
fields(Record<string, string>): Mapping of old field names to new field names
Example
rename:
config:
fields:
old_name: new_name
timestamp: ts
msg: message