tutorial

How to Monitor Apache Kafka Health and Consumer Lag with Vigilmon

Kafka broker failures and consumer group lag spikes kill event-driven pipelines silently. Learn how to monitor Kafka broker health, topic consumer lag, and consumer application liveness with Vigilmon HTTP probes and heartbeat monitors.

Apache Kafka is the backbone of event-driven architectures — but a broker failure, an under-replicated partition, or a consumer group falling behind can silently stall your entire data pipeline. Unlike HTTP services, Kafka failures don't return 500 errors; they just stop processing. Events pile up in the topic, consumer lag climbs, and downstream services get nothing — often for minutes or hours before anyone notices.

Vigilmon gives you external visibility into Kafka health through HTTP probe monitoring (via Kafka REST Proxy or Confluent health APIs) and heartbeat monitoring for your consumer applications. This tutorial covers both.


Why Kafka Needs External Monitoring

Standard Kafka tooling (JMX metrics, Kafka Manager, Confluent Control Center) provides rich internal metrics — but only if you're watching the dashboard. External monitoring with Vigilmon adds:

  • Proactive alerting when broker health endpoints return non-200
  • Consumer lag spike detection via a health sidecar that queries the __consumer_offsets topic
  • Heartbeat monitoring so you know immediately when a consumer application stops processing — even if the broker itself is healthy
  • Multi-region availability checking from outside your network perimeter

These layers work together: a broker can be healthy while a consumer is silently stuck, and a consumer can be healthy while the broker it reads from has lost partition leadership.


Step 1: Build a Kafka Health Endpoint

Kafka brokers expose JMX metrics but not HTTP. You need a lightweight health sidecar — or expose health via Kafka REST Proxy (Confluent) or a custom endpoint in your consumer application.

Node.js Health Sidecar (kafka-node / kafkajs)

// health/kafka.js
const express = require('express');
const { Kafka } = require('kafkajs');

const app = express();
const kafka = new Kafka({
  clientId: 'health-checker',
  brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
});

const admin = kafka.admin();

app.get('/health/kafka', async (req, res) => {
  try {
    await admin.connect();

    // Check cluster metadata — fails if broker is unreachable
    const metadata = await admin.fetchTopicMetadata({ topics: [] });
    const brokerCount = metadata.brokers.length;

    // Check for under-replicated partitions in monitored topics
    const topics = (process.env.KAFKA_MONITOR_TOPICS || '').split(',').filter(Boolean);
    let underReplicated = [];
    if (topics.length > 0) {
      const topicMeta = await admin.fetchTopicMetadata({ topics });
      for (const topic of topicMeta.topics) {
        for (const partition of topic.partitions) {
          if (partition.isr.length < partition.replicas.length) {
            underReplicated.push(`${topic.name}:${partition.partitionId}`);
          }
        }
      }
    }

    await admin.disconnect();

    if (underReplicated.length > 0) {
      return res.status(503).json({
        status: 'degraded',
        reason: 'under_replicated_partitions',
        partitions: underReplicated,
      });
    }

    return res.status(200).json({ status: 'ok', brokers: brokerCount });
  } catch (err) {
    return res.status(503).json({ status: 'down', error: err.message });
  }
});

app.listen(3004);

Consumer Lag Health Endpoint

Add a separate endpoint that checks consumer group lag:

app.get('/health/kafka/lag', async (req, res) => {
  try {
    await admin.connect();

    const groupId = process.env.KAFKA_CONSUMER_GROUP;
    const offsets = await admin.fetchOffsets({ groupId, topics: [] });
    const topicOffsets = await admin.fetchTopicOffsets(process.env.KAFKA_TOPIC);

    let maxLag = 0;
    for (const partitionOffset of topicOffsets) {
      const consumerOffset = offsets.find(
        o => o.partition === partitionOffset.partition
      );
      if (consumerOffset) {
        const lag = parseInt(partitionOffset.high) - parseInt(consumerOffset.offset);
        maxLag = Math.max(maxLag, lag);
      }
    }

    await admin.disconnect();

    const lagThreshold = parseInt(process.env.KAFKA_LAG_THRESHOLD || '1000');
    if (maxLag > lagThreshold) {
      return res.status(503).json({
        status: 'degraded',
        reason: 'consumer_lag_spike',
        max_lag: maxLag,
        threshold: lagThreshold,
      });
    }

    return res.status(200).json({ status: 'ok', max_lag: maxLag });
  } catch (err) {
    return res.status(503).json({ status: 'down', error: err.message });
  }
});

Using Confluent REST Proxy

If you run Confluent Platform, the REST Proxy exposes a cluster health endpoint you can probe directly:

# Confluent REST Proxy cluster health
curl -i http://localhost:8082/v3/clusters

# Confluent Schema Registry health
curl -i http://localhost:8081/

# Kafka Connect health
curl -i http://localhost:8083/connectors

Point your Vigilmon monitor at whichever of these matches your deployment.


Step 2: Configure Vigilmon HTTP Monitor for Kafka

  1. Log in to vigilmon.online and go to Monitors → New Monitor
  2. Choose HTTP / HTTPS
  3. Set the URL to your Kafka health endpoint: https://your-app.example.com/health/kafka
  4. Set the check interval to 1 minute
  5. Under Expected response, configure:
    • Status code: 200
    • Response body contains: "status":"ok"
    • Response time threshold: 5000ms (Kafka metadata fetch can take a moment)
  6. Under Alert channels, assign your Slack or PagerDuty channel
  7. Save the monitor

Add a second monitor for consumer lag:

  • URL: https://your-app.example.com/health/kafka/lag
  • Expected: 200, body contains "status":"ok"
  • Interval: 1 minute
  • Alert channel: separate Slack channel for lag alerts (often P2 vs P1 for broker failures)

Vigilmon's multi-region probe consensus prevents lag spike alerts from firing on transient calculation delays.


Step 3: Heartbeat Monitoring for Kafka Consumer Applications

Broker health and topic lag are necessary — but not sufficient. A consumer application can be stuck in an error retry loop, blocked on a downstream dependency, or quietly paused due to a partition rebalance, while the broker metrics look perfectly healthy.

Vigilmon heartbeat monitors detect silent consumer stalls: your consumer sends a ping to Vigilmon after each successful message batch. If pings stop, Vigilmon fires an alert.

Set Up the Heartbeat Monitor

  1. In Vigilmon, go to Monitors → New Monitor → Heartbeat
  2. Set the name: kafka-order-processor-consumer
  3. Set the expected interval: 5 minutes (adjust to your message throughput)
  4. Set the grace period: 10 minutes
  5. Save — copy the heartbeat URL, e.g. https://vigilmon.online/heartbeat/abc123xyz

Wire It Into Your Consumer (KafkaJS)

// consumer.js
const { Kafka } = require('kafkajs');
const axios = require('axios');

const kafka = new Kafka({
  clientId: 'order-processor',
  brokers: process.env.KAFKA_BROKERS.split(','),
});

const consumer = kafka.consumer({ groupId: 'order-processor-group' });

let messagesSinceLastHeartbeat = 0;
const HEARTBEAT_EVERY_N_MESSAGES = 100;

async function run() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders', fromBeginning: false });

  await consumer.run({
    eachBatch: async ({ batch, heartbeat }) => {
      for (const message of batch.messages) {
        await processMessage(message);
        messagesSinceLastHeartbeat++;

        // Ping Vigilmon every N messages to prove liveness
        if (messagesSinceLastHeartbeat >= HEARTBEAT_EVERY_N_MESSAGES) {
          await axios.get(process.env.VIGILMON_HEARTBEAT_URL).catch(() => {});
          messagesSinceLastHeartbeat = 0;
        }

        // Also call Kafka's internal heartbeat to avoid session timeout
        await heartbeat();
      }
    },
  });
}

run();

For low-throughput consumers where messages arrive infrequently, send the heartbeat on a timer instead:

setInterval(async () => {
  if (lastProcessedAt && Date.now() - lastProcessedAt < 5 * 60 * 1000) {
    await axios.get(process.env.VIGILMON_HEARTBEAT_URL).catch(() => {});
  }
}, 60 * 1000);

Python Consumer (confluent-kafka)

# consumer.py
from confluent_kafka import Consumer
import requests, os, time

conf = {
    'bootstrap.servers': os.environ['KAFKA_BROKERS'],
    'group.id': 'order-processor-group',
    'auto.offset.reset': 'latest',
}
consumer = Consumer(conf)
consumer.subscribe([os.environ['KAFKA_TOPIC']])

last_heartbeat = time.time()
HEARTBEAT_INTERVAL = 60  # seconds

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg and not msg.error():
            process_message(msg)

        # Ping Vigilmon every 60 seconds of healthy consumption
        if time.time() - last_heartbeat > HEARTBEAT_INTERVAL:
            requests.get(os.environ['VIGILMON_HEARTBEAT_URL'], timeout=5)
            last_heartbeat = time.time()
finally:
    consumer.close()

Step 4: Alert Routing for Consumer Lag and Broker Failures

Kafka failures tend to cascade: a broker failure causes partition leadership elections, which causes brief consumer pauses, which causes lag spikes, which causes downstream processing delays. You want to know about broker failures before the cascade reaches your consumers.

Configure alert routing in Vigilmon:

| Monitor | Alert Channel | Priority | |---|---|---| | Kafka broker /health/kafka | Slack + PagerDuty | P1 | | Consumer lag /health/kafka/lag | Slack | P2 | | Heartbeat: order processor consumer | Slack + email | P2 | | Heartbeat: analytics consumer | Email | P3 |

Set response time thresholds for early warning:

  • Alert at 3000ms for the broker health endpoint (slow metadata fetch signals broker pressure)
  • Alert at 5000ms for consumer lag endpoint (admin API slowness precedes message processing delays)

For critical consumers, add a second heartbeat monitor with a tighter interval and no grace period — this acts as an emergency pager for complete consumer shutdown.


Summary

Kafka failures are silent by nature. You need external monitoring to catch broker degradation, consumer lag spikes, and consumer application stalls before they become data pipeline outages:

| Monitor Type | What It Covers | |---|---| | HTTP monitor on /health/kafka | Broker reachability, under-replicated partitions | | HTTP monitor on /health/kafka/lag | Consumer group lag threshold breaches | | Heartbeat monitor | Consumer application liveness, processing health |

Get started free at vigilmon.online — your first Kafka monitor is running in under two minutes.

Monitor your app with Vigilmon

Free plan — 5 monitors, no credit card required. Up and running in 60 seconds.

Start free →