4 Plugins
erik edited this page 2025-10-28 21:24:19 +00:00

Plugin Documentation

Sources

Plugin: tail-file

This plugin watches files for changes and emits new records. Tracks file positions and inodes in a state file to handle rotations. Re-evaluates globs periodically to discover new files. Supports starting from beginning or end of files.

Config

Required:

  • globs (string[]): Array of glob patterns for files to tail

Optional:

  • separator (string): Record delimiter (default: \n)
  • encoding (string): File encoding (default: utf8; options: utf8, ascii, hex, base64, binary)
  • trim (boolean): Strip whitespace from records
  • db (string): State file path (default: {STAGE}.taildb)
  • intervalSeconds (number): Glob re-evaluation interval (default: 60)
  • tailInterval (number): File check interval in milliseconds
  • fromStart (boolean): Read new files from beginning instead of end

Example

tail-file:
  config:
    globs:
      - /var/log/app/*.log
    separator: "\n"
    encoding: utf8
    db: ./.logbus-state.json
    intervalSeconds: 60
    fromStart: true

Plugin: read-file

This plugin reads files at startup, splitting content by separator and emitting records. Supports glob patterns, multiple encodings, and throttled reading. Pipeline shuts down after all files are consumed. For continuous file monitoring, use tail-file instead.

Config

Required:

  • globs (string[]): Array of glob patterns for files to read

Optional:

  • separator (string): Record delimiter (default: \n)
  • encoding (string): File encoding (default: utf8; options: utf8, ascii, hex, base64)
  • trim (boolean): Strip whitespace from records
  • maxMbps (number): Throttle rate in MB/s (default: 100)

Example

read-file:
  config:
    globs:
      - /var/log/app/*.log
      - /var/log/system.log
    separator: "\n"
    encoding: utf8
    trim: true
    maxMbps: 50

Plugin: read-http

This plugin creates an HTTP server that listens for requests and emits them as events. Parses JSON bodies automatically based on Content-Type header. Each request is emitted with method, headers, query parameters, and body.

Config

Optional:

  • port (number): Port to listen on (default: 8080)
  • host (string): Host to bind to (default: 0.0.0.0)

Example

read-http:
  config:
    port: 3000
    host: localhost

Plugin: read-journal

This plugin queries the systemd journal at intervals and emits entries as events. Tracks last cursor/timestamp in a state file to avoid re-processing. Supports starting from a relative time on first run.

Config

Optional:

  • journalctl (string): Path to journalctl binary (default: journalctl)
  • intervalSeconds (number): Query interval (default: 60)
  • db (string): State file path (default: logbus-journal-db)
  • sinceMinutes (number): Look back this many minutes on first run

Example

read-journal:
  config:
    sinceMinutes: 2880 # 48 hours
    intervalSeconds: 10
    db: .logbus-state.json

Plugin: read-kafka

This plugin connects to Kafka brokers and consumes messages from a specified topic. Messages are emitted as events with the value in the payload field. Supports consumer groups and optional max message limits.

Config

Required:

  • brokers (string[]): Array of Kafka broker addresses
  • topic (string): Topic to consume from
  • group (string): Consumer group ID

Optional:

  • client (string): Kafka client ID (default: logbus)
  • max (number): Maximum messages to consume before stopping
  • replay (boolean): Start from beginning of topic

Example

read-kafka:
  config:
    brokers:
      - kafka1:9092
      - kafka2:9092
    topic: logs
    group: logbus-consumer
    client: logbus-instance-1

Plugin: read-stdin

This plugin reads from stdin, splits input by a matcher pattern, and emits each line as an event. Supports throttling and custom encodings. By default, shuts down the pipeline when stdin closes.

Config

Optional:

  • encoding (string): Input encoding (default: utf8; options: utf8, ascii, hex, base64, binary)
  • matcher (string | regex): Line separator pattern (default: \r?\n)
  • maxMbps (number): Throttle rate in MB/s (default: 100)
  • stopOnEOF (boolean): Shutdown pipeline on EOF (default: true)

Example

read-stdin:
  config:
    encoding: utf8
    matcher: "\n"
    maxMbps: 50
    stopOnEOF: false

Plugin: query-elasticsearch

This plugin executes a search against Elasticsearch and emits each hit as an event. Uses the scroll API to paginate through results. Supports dynamic index patterns and custom search bodies. Pipeline shuts down when results are exhausted or max hits reached.

Config

Required:

  • index (string | function): Index pattern to search
  • search (object): Search body (Elasticsearch query DSL)

Optional:

  • endpoint (string | function): Elasticsearch endpoint (default: http://localhost:9200)
  • scroll (string): Scroll context timeout (default: 1m)
  • max (number): Maximum hits to process (default: 0 = unlimited)
  • ssl.ca (string): Path to CA certificate file

Example

query-elasticsearch:
  config:
    index: logs-*
    scroll: 1m
    search:
      size: 333
      query:
        range:
          timestamp:
            gte: now-1d
    endpoint: http://localhost:9200
    ssl:
      ca: ~/.certs/ca.pem

Sinks

Plugin: index-elasticsearch

This plugin buffers events and ships them to Elasticsearch using the /_bulk endpoint. Shipping occurs when the buffer reaches a size limit or at regular intervals. Supports custom SSL certificates and dynamic endpoint configuration.

Config

Optional:

  • endpoint (string | function): Elasticsearch endpoint (default: http://localhost:9200)
  • index (string): Default index name (default: logbus)
  • bufferSize (number): Events to buffer before shipping (default: 1000)
  • intervalSeconds (number): Interval between shipments (default: 60)
  • ssl.ca (string): Path to CA certificate file

Event fields _index, _id, and _type are used for document metadata if present.

Example

NOTE: This plugin will emit the response from Elasticsearch bulk index request as downstream events, so it is important to declare it as an output stage with outChannels: [].

index-elasticsearch:
  outChannels: []
  config:
    intervalSeconds: 10
    bufferSize: 1000
    endpoint: http://localhost:9200
    # endpoint: !!js/function >-
    #   function(event) {
    #     return `https://${process.env.ES_USER}:${process.env.ES_PASS}@your-site`
    #   }
    ssl:
      ca: ~/.certs/ca.pem

Plugin: write-file

This plugin writes the payload field of events to a file. Supports multiple encodings and can either append or overwrite. Does not propagate events downstream (sets outChannels to empty array).

Config

Required:

  • path (string): File path (supports ~ expansion)

Optional:

  • encoding (string): Output encoding (default: utf8; options: utf8, ascii, hex, base64, binary)
  • append (boolean): Append to file instead of overwriting

Example

write-file:
  config:
    path: ~/logs/output.log
    encoding: utf8
    append: true

Plugin: write-http

This plugin posts event payloads to an HTTP/HTTPS URL. Supports custom headers and methods. Waits for all inflight requests before shutdown.

Config

Required:

  • url (string): Target HTTP endpoint

Optional:

  • method (string): HTTP method (default: POST; options: GET, POST, DELETE, PUT, PATCH)
  • headers

Plugin: send-email

This plugin sends events as email messages. The event payload becomes the email body. Supports authenticated and unauthenticated SMTP. Waits for all inflight emails before shutdown.

Config

Required:

  • to (string): Recipient email address
  • from (string): Sender email address
  • subject (string): Email subject
  • host (string): SMTP server hostname

Optional:

  • port (number): SMTP port (default: 587)
  • user (string): SMTP username
  • password (string): SMTP password

Example

send-email:
  config:
    to: alerts@example.com
    from: logbus@example.com
    subject: Alert from LogBus
    host: smtp.example.com
    port: 587
    user: logbus
    password: secret

Observability

Plugin: log

This plugin writes events to the LogBus internal Bunyan logger. It supports standard log levels and can extract level information from event fields. Adds hostname if not present. This plugin sets outChannels to an empty array, preventing downstream propagation.

Config

Optional:

  • defaultLevel ('trace' | 'debug' | 'info' | 'warn' | 'error' | 'fatal'): Default log level if not specified in event (default: info)
  • extra (Record<string, unknown>): Additional metadata to attach to all log entries

The log level will attempt to be used from the event from these fields: level, severity. Those values will be mapped to {trace...fatal} like so:

  • Bunyan format: 10-60 (trace through fatal)
  • Syslog format: 1-7 (fatal through trace)
  • Named: trace, debug, info, warn, warning, error, err, fatal

Example

log:
  inChannels:
    - log-errors
    - log-stats
  config:
    defaultLevel: warn
    extra:
      env: prod

Plugin: errors

This plugin collects errors by stage and message, keeping only a sample of each unique error. Errors are aggregated and emitted periodically to minimize memory consumption and avoid flooding logs with duplicate error messages.

Config

Optional:

  • intervalSeconds (number): How often to emit aggregated errors (default: 60)
  • stackDepth (number): Number of stack trace levels to retain (default: 3)

Example

NOTE: There are two reserved channel names: stats & errors, so it is important to use a pipeline stage name other than errors to avoid a clash.

log-errors:
  module: errors
  inChannels:
    - errors
  config:
    intervalSeconds: 300
    stackDepth: 3

Plugin: stats

This plugin collects metrics from other pipeline stages and emits aggregated statistics events. Tracks standard metrics (errors, events, bytes, lines) and supports custom metrics. Includes memory usage information.

Config

Optional:

  • intervalSeconds (number): Emit interval (default: 15)
  • extra (string[]): Additional custom metric names to track

Standard Metrics:

  • errors, rxEvents, txEvents, rxBytes, txBytes, rxLines, txLines

Example

NOTE: There are two reserved channel names: stats & errors, so it is important to use a pipeline stage name other than stats to avoid a clash.

log-stats:
  module: stats
  inChannels:
    - stats
  config:
    intervalSeconds: 300

Plugin: sample

This plugin emits events periodically based on count (every nth event) or time (every N seconds), or both.

Config

Required (at least one):

  • nth (number): Emit every nth event
  • intervalSeconds (number): Emit every N seconds

Example

sample:
  config:
    nth: 100

Plugin: tap

This plugin gates event flow using a custom function that returns true/false. The function is evaluated at startup and periodically thereafter. Events only flow downstream when the tap is "on".

Config

Required:

  • tap (() => boolean | Promise): Function returning boolean to control event flow

Optional:

  • intervalSeconds (number): Re-evaluation interval (default: 60)

Example

tap:
  config:
    tap: !!js/function >-
      function() {
        const hour = new Date().getHours()
        return hour >= 9 && hour <= 17
      }
    intervalSeconds: 300

Plugin: parse-otel

This plugin processes OTLP metrics from HTTP requests, supporting both JSON and Protobuf formats. Each datapoint is emitted as a separate LogBus event. Supports Gauge, Sum, Histogram, and Summary metric types.

Status: Work in Progress

Config

No configuration required.

Example

parse-otel:
  config: {}

Parsing & Serialization

Plugin: parse-json

This plugin attempts to parse the payload field as JSON. On success, it emits the parsed object. On failure, it logs an error with surrounding text context to aid troubleshooting.

Config

No configuration required.

Example

parse-json:
  config: {}

Plugin: parse-lines

This plugin splits incoming text by a separator (default newline) and emits each line as a separate event. Maintains a buffer for incomplete lines across chunks. Lines exceeding maxSize are truncated.

Config

Optional:

  • separator (string): Line separator (default: \n)
  • maxSize (number): Maximum line length in bytes (default: 65536)

Example

parse-lines:
  config:
    separator: "\n"
    maxSize: 1000

Plugin: parse-yaml

This plugin attempts to parse the payload field as YAML. On success, it emits the parsed object. On failure, it logs an error.

Config

No configuration required.

Example

parse-yaml:
  config: {}

Plugin: serialize-json

This plugin converts events to JSON strings and places them in the payload field. Supports custom indentation and line delimiters.

Config

Optional:

  • indent (number): Spaces for indentation (default: 0 = compact)
  • delimiter (string): String to append after JSON (default: \n)

Example

serialize-json:
  config:
    indent: 0
    delimiter: ""

Plugin: serialize-yaml

This plugin converts upstream events to YAML and emits them downstream as {payload: yaml(event)}.

Config

No configuration required.

Example

serialize-yaml:
  config: {}

Miscellaneous

Plugin: agg

This plugin collects events into buckets and emits aggregated events when conditions are met. It supports flexible grouping via key functions, filtering to select which events to aggregate, and custom view functions to transform buckets into output events. Buckets can be completed by size limits, time spans, or custom start/stop markers.

Config

Required:

  • filtered (function): Returns true for events that should NOT be aggregated
  • key (function): Returns the grouping key for an event
  • view (function): Transforms a bucket array into a single aggregated event

Optional:

  • tsField (string): Timestamp field name, defaults to ts
  • start (function): Returns true when an event should start a new bucket
  • stop (function): Returns true when an event should complete a bucket
  • maxSize (number | function): Maximum events per bucket (default: 1000)
  • maxEventSeconds (number | function): Maximum time span between first and last event in seconds (default: 300)
  • maxRealSeconds (number | function): Interval for flushing all buckets in seconds (default: 300)

Functions receive a sandbox context with config, util, moment, and hostname.

Example

This example assumes that an upstream stage has marked events that should be aggregated with an _agg object.

count-aggregated:
  module: agg
  config:
    maxSize: 1000
    filtered: !!js/function >-
      function(event) {
        return !event._agg?.type?.startsWith('count')
      }
    start: !!js/function >-
      function(event) {
        return true // let size or time trigger aggregation
      }
    stop: !!js/function >-
      function(event) {
        return false // let size or time trigger aggregation
      }
    key: !!js/function >-
      function(event) {
        return event._agg.key
      }
    view: !!js/function >-
      function(events) {
        let event = events[0] // assuming grouped events are similar enough
        event.end = events[events.length-1].ts
        event.aggregation = 'count'
        event.duration = event.end - event.ts // milliseconds
        event.count = events.length
        if (event.error) {
          event.message = `${event.count}x: ${event.message}`
        } else if (event._agg.type === 'count-process') {
          event.message = events.map(i => i.message).join('\n')
        } else {
          // assume original message in aggregation key.
          event.message = `${event.count}x: ${event._agg.key}`
        }
        delete event._agg
        return event
      }

Plugin: cast

This plugin provides a simple way to convert fields into some common types. If a field doesn't exist in the event, it's ignored. Null values are preserved as null.

Config

Required:

  • fields (object): A mapping of field names to their target types

Supported Types:

  • int - Converts to integer
  • float - Converts to floating-point number
  • bool - Converts to boolean
  • ts-sec - Converts seconds-since-epoch to moment timestamp
  • ts-msec - Converts milliseconds-since-epoch to moment timestamp
  • ts-usec - Converts microseconds-since-epoch to moment timestamp

Example

cast:
  config:
    fields:
      response_time: float
      status_code: int
      success: bool
      created_at: ts-sec

Plugin: drop

This plugin deletes configured fields from each log event that passes through it. Uses lodash's omit function to create a new event object without the specified fields.

Config

Required:

  • fields (string[]): Array of field names to remove from events

Example

drop:
  config:
    fields:
      - _agg
      - password
      - large
      - debug_info

Plugin: fail2ban

This plugin monitors events for failures using a custom failed function. When a key exceeds its failure limit within minutes time window, the ban function is called. Events pass through regardless of failure status. Provides access to ipset command for IP-based banning.

Config

Required:

  • failed ((e: LogEvent) => Failure): Must return {key: string, ts: number, limit: number} where ts is minutes since epoch, or null
  • ban ((e: LogEvent, key: string) => boolean): Called when limit exceeded, must return true to mark as banned
  • minutes (number): Time window for failure counting

Functions receive sandbox context with logbus, config, moment, get, set, and ipset.

Example

fail2ban:
  config:
    failures: {} # to avoid banning the same ip multiple times
    minutes: 5
    failed: !!js/function >-
      function(event) {
        let key
        let ts
        let limit
        if (event.ts && event.message) {
          ts = event.ts.unix() / 60 // convert to minutes since epoch
          let match
          if (this.get(event, 'source.ip') && this.get(event, 'event.action') === 'dropped' && this.get(event, 'event.dataset') === 'iptables') {
            // blacklist anything probing the firewall too much
            key = `iptables~${event.source.ip}`
            limit = 10
          } else if (event.process === 'sshd') {
            match = event.message.match(/invalid user (?<user>\S+) (?<ip>\S+) port (?<port>\d+)/ui)
            if (match) {
              const {user, ip, port} = match.groups
              this.set(event, 'client.ip', ip)
              this.set(event, 'client.port', port)
              this.set(event, 'labels.user', user)
              key = `ssh~${ip}`
              limit = 1
            }
          } else if (this.get(event, 'event.category', []).includes('web') && this.get(event, 'event.type', []).includes('access')) {
            // assuming web access log parsed by us
            const path = this.get(event, 'url.path', '')
            const status = this.get(event, 'http.response.status_code')
            if (path.endsWith('.env') || path.includes('\\x')) {
              key = `web~${event.client.ip}`
              limit = 1
            } else if (status >= 400 && status < 500) {
              key = `web~${event.client.ip}`
              limit = status !== 403 ? 5 : 10 // 5 failures is too easy to ban legit users
            }
          }
          if (key) {
            this.config.failures[key] = event
          }
        }
        return {key, ts, limit}
      }
    ban: !!js/function >-
      function ban(event, key) {
        if (!this.config.failures[key]) {
          return false
        }
        const [service, ip] = key.split('~')
        this.logbus.stats({banned: 1})
        this.ipset(['add', '-!', `blacklist-${service}`, ip], (err, stdout, stderr) => {
          this.logbus.log.warn({ip, service}, 'banned', stderr, stdout)
        })
        event.tags.push('banned')
        delete this.config.failures[key]
        return true
      }

Plugin: gc

This plugin triggers Node.js garbage collection periodically and emits an event with the duration of each collection. Requires Node.js to be started with --expose-gc flag.

Config

Optional:

  • intervalSeconds (number): Interval between garbage collections (default: 60)

Example

gc:
  config:
    intervalSeconds: 120

Plugin: geoip

This plugin queries a MaxMind GeoLite2 database to enrich events with geographic information. It extracts IP addresses using a custom ip function and enriches events using a custom enrich function. Events are always emitted downstream, whether enrichment succeeds or fails.

Config

Required:

  • path (string): Path to MaxMind GeoLite2 database file
  • ip (function): Extracts IP address from event
  • enrich (function): Applies geodata to event

Optional:

  • test (function): Mock function for testing (bypasses database lookup)

Functions receive sandbox context with get and set from lodash.

Example

geoip:
  config:
    path: /path/to/GeoLite2-City.mmdb
    ip: !!js/function >-
      function(event) {
        return this.get(event, 'client.ip') || this.get(event, 'source.ip') || event.ip
      }
    enrich: !!js/function >-
      function(event, results) {
        // https://www.elastic.co/guide/en/ecs/current/ecs-geo.html
        const geo = {}
        if (results.city) {
          geo.city_name = results.city.names.en
        }
        if (results.continent) {
          geo.continent_code = results.continent.code
        }
        if (results.country) {
          geo.country_iso_code = results.country.isoCode
          geo.country_name = results.country.names.en
        }
        if (results.postal) {
          geo.postal_code = results.postal.code
        }
        if (results.location) {
          geo.location = [results.location.longitude, results.location.latitude]
        }
        if (results.traits) {
          event.labels = Object.assign({}, event.labels, geo.traits)
          delete event.labels.ipAddress
          delete event.labels.network
        }
        if (this.get(event, 'client.ip')) {
          event.client.geo = geo
        } else if (this.get(event, 'source.ip')) {
          event.source.geo = geo
        } else {
          event.geo = geo
        }
      }

Plugin: js

This plugin runs a user-defined function on each event. If the function returns a truthy value, it's emitted downstream. Null or undefined results filter out the event. The function receives a sandbox context:

  • config: a reference to this stage's config
  • util: the nodejs util module
  • moment: the moment library
  • hostname: the os.hostname() function

Config

Required:

  • function ((e: LogEvent) => null | LogEvent): Transformation function to apply to events

Example

not-aggregated:
  module: js
  config:
    function: !!js/function >-
      function(event) {
        if (event._agg === undefined) {
          return event
        }
      }

Plugin: keep

This plugin creates new events containing only the configured fields. Each destination field can map to a single source field or an array of fallback sources (first defined value wins). Destination fields are always defined, set to null if no source has a value.

Config

Required:

  • fields (Record<string, string | string[]>): Mapping of destination fields to source field(s)

Example

keep:
  config:
    fields:
      timestamp: [ts, timestamp, "@timestamp"]
      message: [msg, message, text]
      level: [severity, level]

Plugin: rename

This plugin renames fields in events based on a configured mapping. Source fields that don't exist are ignored. After renaming, source fields are deleted from the event.

Config

Required:

  • fields (Record<string, string>): Mapping of old field names to new field names

Example

rename:
  config:
    fields:
      old_name: new_name
      timestamp: ts
      msg: message