Apache Kafka is a distributed event store and stream-processing platform. Telegraf is a plug-in driven server agent for collecting and sending metrics and events from databases, systems and IoT sensors.

This integration lets you send metrics from your Apache Kafka HTTP server using the JMX to Prometheus exporter connected to Telegraf.

To send your Prometheus-format Apache Kafka metrics to Logz.io, you need to add the inputs.prometheus and outputs.http plug-ins to your Telegraf configuration file.

Setup instructions

Before you begin, you’ll need: Apache Kafka running on your server

Download the JMX to Prometheus exporter

Run all commands for this integration from your Apache Kafka directory.

Navigate to the Kafka directory on your host and run the following command:

wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar
Create a configuration file for the JMX to Prometheus exporter

Navigate to the Kafka directory on your host, create a file named kafka.yml with the following content:

lowercaseOutputName: true
 
 
rules:
# Special cases and very specific rules
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
 name: kafka_server_$1_$2
 type: GAUGE
 labels:
   clientId: "$3"
   topic: "$4"
   partition: "$5"
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
 name: kafka_server_$1_$2
 type: GAUGE
 labels:
   clientId: "$3"
   broker: "$4:$5"
- pattern : kafka.coordinator.(\w+)<type=(.+), name=(.+)><>Value
 name: kafka_coordinator_$1_$2_$3
 type: GAUGE
 
# Generic per-second counters with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
 name: kafka_$1_$2_$3_total
 type: COUNTER
 labels:
   "$4": "$5"
   "$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
 name: kafka_$1_$2_$3_total
 type: COUNTER
 labels:
   "$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
 name: kafka_$1_$2_$3_total
 type: COUNTER
 
- pattern: kafka.server<type=(.+), client-id=(.+)><>([a-z-]+)
 name: kafka_server_quota_$3
 type: GAUGE
 labels:
   resource: "$1"
   clientId: "$2"
 
- pattern: kafka.server<type=(.+), user=(.+), client-id=(.+)><>([a-z-]+)
 name: kafka_server_quota_$4
 type: GAUGE
 labels:
   resource: "$1"
   user: "$2"
   clientId: "$3"
 
# Generic gauges with 0-2 key/value pairs
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
 name: kafka_$1_$2_$3
 type: GAUGE
 labels:
   "$4": "$5"
   "$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
 name: kafka_$1_$2_$3
 type: GAUGE
 labels:
   "$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
 name: kafka_$1_$2_$3
 type: GAUGE
 
# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's.
#
# Note that these are missing the '_sum' metric!
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
 name: kafka_$1_$2_$3_count
 type: COUNTER
 labels:
   "$4": "$5"
   "$6": "$7"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
 name: kafka_$1_$2_$3
 type: GAUGE
 labels:
   "$4": "$5"
   "$6": "$7"
   quantile: "0.$8"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
 name: kafka_$1_$2_$3_count
 type: COUNTER
 labels:
   "$4": "$5"
- pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
 name: kafka_$1_$2_$3
 type: GAUGE
 labels:
   "$4": "$5"
   quantile: "0.$6"
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
 name: kafka_$1_$2_$3_count
 type: COUNTER
- pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
 name: kafka_$1_$2_$3
 type: GAUGE
 labels:
   quantile: "0.$4"
 
- pattern: 'java.lang<name=([\s\w]+), type=GarbageCollector, key=(\w+)>(.*): (\d+)'
 name: jvm_gc_$3
 labels:
   name: $1
   key: $2
 value: $4
 type: GAUGE
 
- pattern: 'java.lang<name=([\s\w]+), type=GarbageCollector>(.*): (\d+)'
 name: jvm_gc_$2
 labels:
   name: $1
 value: $3
 type: GAUGE
 
- pattern: kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count
 name: kafka_log_log_flush_rate_time
 help: Log flush rate and time (rate in seconds, latency|time in ms)
 type: UNTYPED
 
#kafka.connect:type=app-info,client-id="{clientid}"
#kafka.consumer:type=app-info,client-id="{clientid}"
#kafka.producer:type=app-info,client-id="{clientid}"
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>start-time-ms'
 name: kafka_$1_start_time_seconds
 labels:
   clientId: "$2"
 help: "Kafka $1 JMX metric start time seconds"
 type: GAUGE
 valueFactor: 0.001
- pattern: 'kafka.(.+)<type=app-info, client-id=(.+)><>(commit-id|version): (.+)'
 name: kafka_$1_$3_info
 value: 1
 labels:
   clientId: "$2"
   $3: "$4"
 help: "Kafka $1 JMX metric info version and commit-id"
 type: GAUGE
 
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(.+-total|compression-rate|.+-avg|.+-replica|.+-lag|.+-lead)
 name: kafka_$2_$6
 labels:
   clientId: "$3"
   topic: "$4"
   partition: "$5"
 help: "Kafka $1 JMX metric type $2"
 type: GAUGE
 
#kafka.producer:type=producer-topic-metrics,client-id="{clientid}",topic="{topic}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}",topic="{topic}"", partition="{partition}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), topic=(.+)><>(.+-total|compression-rate|.+-avg)
 name: kafka_$2_$5
 labels:
   clientId: "$3"
   topic: "$4"
 help: "Kafka $1 JMX metric type $2"
 type: GAUGE
 
#kafka.connect:type=connect-node-metrics,client-id="{clientid}",node-id="{nodeid}"
#kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id="{nodeid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.+), node-id=(.+)><>(.+-total|.+-avg)
 name: kafka_$2_$5
 labels:
   clientId: "$3"
   nodeId: "$4"
 help: "Kafka $1 JMX metric type $2"
 type: UNTYPED
 
#kafka.connect:type=kafka-metrics-count,client-id="{clientid}"
#kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-coordinator-metrics,client-id="{clientid}"
#kafka.consumer:type=consumer-metrics,client-id="{clientid}"
- pattern: kafka.(.+)<type=(.+)-metrics, client-id=(.*)><>(.+-total|.+-avg|.+-bytes|.+-count|.+-ratio|.+-age|.+-flight|.+-threads|.+-connectors|.+-tasks|.+-ago)
 name: kafka_$2_$4
 labels:
   clientId: "$3"
 help: "Kafka $1 JMX metric type $2"
 type: GAUGE
 
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}<> status"
- pattern: 'kafka.connect<type=connector-task-metrics, connector=(.+), task=(.+)><>status: ([a-z-]+)'
 name: kafka_connect_connector_status
 value: 1
 labels:
   connector: "$1"
   task: "$2"
   status: "$3"
 help: "Kafka Connect JMX Connector status"
 type: GAUGE
 
#kafka.connect:type=task-error-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=source-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=sink-task-metrics,connector="{connector}",task="{task}"
#kafka.connect:type=connector-task-metrics,connector="{connector}",task="{task}"
- pattern: kafka.connect<type=(.+)-metrics, connector=(.+), task=(.+)><>(.+-total|.+-count|.+-ms|.+-ratio|.+-avg|.+-failures|.+-requests|.+-timestamp|.+-logged|.+-errors|.+-retries|.+-skipped)
 name: kafka_connect_$1_$4
 labels:
   connector: "$2"
   task: "$3"
 help: "Kafka Connect JMX metric type $1"
 type: GAUGE
 
#kafka.connect:type=connector-metrics,connector="{connector}"
#kafka.connect:type=connect-worker-metrics,connector="{connector}"
- pattern: kafka.connect<type=connect-worker-metrics, connector=(.+)><>([a-z-]+)
 name: kafka_connect_worker_$2
 labels:
   connector: "$1"
 help: "Kafka Connect JMX metric $1"
 type: GAUGE
 
#kafka.connect:type=connect-worker-metrics
- pattern: kafka.connect<type=connect-worker-metrics><>([a-z-]+)
 name: kafka_connect_worker_$1
 help: "Kafka Connect JMX metric worker"
 type: GAUGE
 
#kafka.connect:type=connect-worker-rebalance-metrics
- pattern: kafka.connect<type=connect-worker-rebalance-metrics><>([a-z-]+)
 name: kafka_connect_worker_rebalance_$1
 help: "Kafka Connect JMX metric rebalance information"
 type: GAUGE
Start Zookeeper

Start the Zookeeper as follows:

bin/zookeeper-server-start.sh config/zookeeper.properties
Start Apache Kafka with the JMX to Prometheus exporter

Open a new terminal and run the following command:

KAFKA_OPTS="`$KAFKA_OPTS` -javaagent:`PWD`/jmx_prometheus_javaagent-0.16.1.jar=9100:`PWD`/kafka.yml" bin/kafka-server-start.sh config/server.properties
Set up Telegraf v1.17 on a dedicated machine
For Windows:
wget https://dl.influxdata.com/telegraf/releases/telegraf-1.19.2_windows_amd64.zip

After downloading the archive, extract its content into C:\Program Files\Logzio\telegraf\.

The configuration file is located at C:\Program Files\Logzio\telegraf\.

For MacOS:
brew install telegraf

The configuration file is located at /usr/local/etc/telegraf.conf.

For Linux:

Ubuntu & Debian

sudo apt-get update && sudo apt-get install telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

RedHat and CentOS

sudo yum install telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

SLES & openSUSE

# add go repository
zypper ar -f obs://devel:languages:go/ go
# install latest telegraf
zypper in telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

FreeBSD/PC-BSD

sudo pkg install telegraf

The configuration file is located at /etc/telegraf/telegraf.conf.

Add the inputs.prometheus plug-in

First you need to configure the input plug-in to enable Telegraf to scrape data from your hosts. To do this, add the below code to the configuration file.

[[inputs.prometheus]]
 urls = ["http://localhost:9100/metrics"]
 response_timeout = "10s"
Add the outputs.http plug-in

After you create the configuration file, configure the output plug-in to enable Telegraf to send your data to Logz.io in Prometheus-format. To do this, add the following code to the configuration file:

[[outputs.http]]
  url = "https://<<LISTENER-HOST>>:8053"
  data_format = "prometheusremotewrite"
  [outputs.http.headers]
     Content-Type = "application/x-protobuf"
     Content-Encoding = "snappy"
     X-Prometheus-Remote-Write-Version = "0.1.0"
     Authorization = "Bearer <<PROMETHEUS-METRICS-SHIPPING-TOKEN>>"

Replace the placeholders to match your specifics. (They are indicated by the double angle brackets << >>):

  • Replace <<PROMETHEUS-METRICS-SHIPPING-TOKEN>> with a token for the Metrics account you want to ship to.
    Here’s how to look up your Metrics token.
  • Replace <<LISTENER-HOST>> with the Logz.io Listener URL for your region, configured to use port 8052 for http traffic, or port 8053 for https traffic. For example, listener.logz.io if your account is hosted on AWS US East, or listener-nl.logz.io if hosted on Azure West Europe.
Start Telegraf
On Windows:
telegraf.exe --service start
On MacOS:
telegraf --config telegraf.conf
On Linux:

Linux (sysvinit and upstart installations)

sudo service telegraf start

Linux (systemd installations)

systemctl start telegraf
Check Logz.io for your metrics

Give your data some time to get from your system to ours, then log in to your Logz.io Metrics account, and open the Logz.io Metrics tab.