background-shape
Kafka 3.5 as the Durable Buffer for Industrial IoT Ingestion
August 19, 2023 · 7 min read · by Muhammad Amal programming

TL;DR — Kafka belongs behind your MQTT broker, not in front of it / Use one topic per logical telemetry stream, partition by device_id, and let downstream consumers decide their materialization / Idempotent producers plus consumer-side dedupe is the realistic exactly-once story for IIoT in 2023.

I’ve watched too many architecture diagrams put Kafka where the MQTT broker should be. Sensors talk MQTT — that’s the protocol the devices, the gateways, and the SCADA tooling all speak. Kafka is a brilliant durable log; it’s not a device-facing protocol. Put the broker at the edge of your system and Kafka one step inside, and the architecture stops fighting you.

This is the pattern I’ve landed on after running this stack at three different scales in 2023. The MQTT broker (typically EMQX, per my earlier post) fans messages into Kafka topics via its rule engine. Kafka is the durable buffer. Downstream consumers — time-series writers, stream processors, ML feature pipelines, archival — read independently with their own offsets and their own failure semantics.

Why Kafka At All If You Have a Broker

The MQTT broker is a router with limited persistence. Kafka is durable, replayable storage. The two solve different problems:

  • Replay. A new analytics team joins six months in and wants the last 90 days of vibration data to backfill a model. With Kafka, you set retention.ms to 90 days and they consume from earliest. With only an MQTT broker, the data isn’t there.
  • Multiple independent consumers. Five downstream systems each want a copy. With Kafka, each consumer group keeps its own offsets and can pause/resume without affecting others. With shared subscriptions in MQTT, a slow consumer either backs up the broker or loses messages.
  • Schema evolution and versioning. Schema Registry plus Avro or Protobuf gives you a typed contract between sensor payloads and downstream code. MQTT payloads are bytes.
  • Stream processing. Flink, Kafka Streams, ksqlDB — the entire stream-processing ecosystem speaks Kafka, not MQTT.

If you don’t need any of those — single consumer, no replay, no schema discipline — you don’t need Kafka. I’ve shipped MQTT-direct-to-InfluxDB pipelines that ran for years.

Topic Design

For an industrial customer with mixed telemetry, I keep it boring:

iot.telemetry.raw           # all raw sensor readings, untransformed
iot.telemetry.normalized    # post-validation, canonical schema
iot.alerts                  # threshold and anomaly events
iot.commands                # device-bound commands (separate ACL)
iot.lifecycle               # connect/disconnect, firmware events
iot.dlq                     # dead-letter for malformed payloads

Six topics, not sixty. Resist the temptation to create iot.plant.east.line.3.vibration. Topic explosions are the Kafka equivalent of MQTT cardinality blowups — the controller eats more memory, rebalances get slower, and your operations team starts hating you.

Partition count is what gives you parallelism. Rule of thumb for IIoT: enough partitions that no single partition exceeds ~10 MB/s sustained. For a 30k msg/sec stream at 256-byte average payload, that’s ~7.5 MB/s total — 12 partitions gives you plenty of headroom and parallelism for 12 consumer threads.

# kafka-topics.sh --bootstrap-server kafka-0:9092 ...
kafka-topics.sh --create \
  --topic iot.telemetry.raw \
  --partitions 24 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config retention.ms=604800000 \
  --config compression.type=zstd \
  --config segment.bytes=536870912

A few of those that matter:

  • min.insync.replicas=2 with RF=3. Loses one broker, still serves writes. Loses two, refuses writes — which is what you want for durability.
  • compression.type=zstd. Better ratio than snappy at modest CPU cost. For sensor data with repeated tags, ratios are excellent.
  • segment.bytes=512MB. Larger segments mean less file overhead at the cost of slightly delayed retention enforcement. Fine for IIoT, where you’re sizing for time, not bytes.

Partitioning Strategy: Device ID Is the Key

Partition by device_id. Period. The producer (EMQX, in my setup) sets the Kafka message key to the device ID:

# EMQX -> Kafka producer config
bridges.kafka_producer.telemetry_raw {
  bootstrap_hosts = "kafka-0:9092,kafka-1:9092,kafka-2:9092"
  topic = "iot.telemetry.raw"
  message {
    key   = "${clientid}"          # device_id
    value = "${payload}"
  }
  producer {
    required_acks         = all
    enable_idempotence    = true
    compression           = zstd
    partition_strategy    = key_dispatch
    max_inflight_requests = 5
    max_batch_bytes       = 896KB
  }
}

Why device ID:

  1. Order is preserved per device. Stream processors can assume a device’s events arrive in order.
  2. Single-key consumers parallelize. A windowed aggregation per device hits one partition only.
  3. Skew is bounded. Devices have roughly similar publication rates. If you partition by plant instead, one busy plant pegs one partition.

The enable_idempotence = true is critical. It lets the producer retry without creating duplicates. Combined with required_acks=all, this is the closest thing to exactly-once you’ll need for telemetry.

Consumer: Validation and Fan-Out

Here’s a minimal Go consumer that validates and forwards into iot.telemetry.normalized, written for Kafka 3.5 with franz-go:

// go.mod: github.com/twmb/franz-go v1.14.x
package main

import (
    "context"
    "encoding/json"
    "log/slog"
    "os"
    "time"

    "github.com/twmb/franz-go/pkg/kgo"
)

type Reading struct {
    DeviceID string  `json:"device_id"`
    Metric   string  `json:"metric"`
    Value    float64 `json:"value"`
    TS       int64   `json:"ts"`
    Quality  int     `json:"quality"`
}

func main() {
    log := slog.New(slog.NewJSONHandler(os.Stdout, nil))

    cl, err := kgo.NewClient(
        kgo.SeedBrokers("kafka-0:9092", "kafka-1:9092", "kafka-2:9092"),
        kgo.ConsumerGroup("telemetry-normalizer"),
        kgo.ConsumeTopics("iot.telemetry.raw"),
        kgo.DisableAutoCommit(),
        kgo.FetchMaxBytes(50<<20),
        kgo.FetchMaxWait(500*time.Millisecond),
        kgo.ProducerLinger(20*time.Millisecond),
        kgo.RequiredAcks(kgo.AllISRAcks()),
        kgo.ProducerBatchCompression(kgo.ZstdCompression()),
    )
    if err != nil { log.Error("client", "err", err); os.Exit(1) }
    defer cl.Close()

    ctx := context.Background()
    for {
        fetches := cl.PollFetches(ctx)
        if errs := fetches.Errors(); len(errs) > 0 {
            for _, e := range errs { log.Error("fetch", "topic", e.Topic, "err", e.Err) }
            continue
        }

        produces := make([]*kgo.Record, 0, 1024)
        fetches.EachRecord(func(r *kgo.Record) {
            var rd Reading
            if err := json.Unmarshal(r.Value, &rd); err != nil {
                produces = append(produces, &kgo.Record{
                    Topic: "iot.dlq", Key: r.Key, Value: r.Value,
                })
                return
            }
            if rd.Quality < 192 || rd.TS == 0 {
                return // drop, count in metrics
            }
            out, _ := json.Marshal(rd)
            produces = append(produces, &kgo.Record{
                Topic: "iot.telemetry.normalized",
                Key:   r.Key,
                Value: out,
            })
        })

        if err := cl.ProduceSync(ctx, produces...).FirstErr(); err != nil {
            log.Error("produce", "err", err)
            continue
        }
        if err := cl.CommitUncommittedOffsets(ctx); err != nil {
            log.Error("commit", "err", err)
        }
    }
}

The order matters: produce then commit. If we commit first and crash before producing, we lose data. If we produce first and crash before committing, we re-process and produce duplicates — which is fine because the producer is idempotent (within a session) and downstream consumers should be dedup-tolerant by message ID anyway.

KRaft and the End of ZooKeeper

In 2023 there’s no reason to start a new Kafka deployment with ZooKeeper. Kafka 3.5 with KRaft is production-ready and operationally simpler — one fewer cluster to babysit, one fewer set of failure modes. A three-node KRaft controller quorum runs alongside your brokers (or on the same nodes for small deployments).

The migration path for existing ZK clusters exists but it’s not for the squeamish. New deployments start KRaft. The Apache Kafka KRaft documentation is the right place to read about quorum sizing.

Common Pitfalls

  • One enormous partition. Picking partition count = consumer count and never revisiting it. When traffic grows 10x you cannot increase a topic’s partition count without breaking key-based ordering. Overprovision partitions at create time (24, 48) and add consumers as load grows.
  • acks=1 and surprise data loss. Default-ish setting, costs you on broker leader election. Pay the latency penalty of acks=all for telemetry you actually care about.
  • Consumer rebalance storms. Default session.timeout.ms of 45s plus max.poll.interval.ms of 5min interact poorly with long-running per-message work. Either keep per-message work short or use cooperative-sticky rebalancing (partition.assignment.strategy=CooperativeStickyAssignor).
  • Schema Registry as an afterthought. Six months in, two teams disagree on what ts means (epoch ms vs RFC3339) and the data lake is a mess. Pick Avro or Protobuf, run Schema Registry, enforce schema on produce.
  • Retention by size, not time. retention.bytes is tempting for cost control but interacts surprisingly with segment.bytes. Set retention by time (retention.ms) and budget capacity for peak load.

Wrapping Up

Kafka is the right durable buffer between the MQTT broker and the long tail of downstream consumers, but only once you’ve got more than one consumer or a replay requirement. Until then, it’s a complex piece of infrastructure earning its keep against an empty room. Next post I’ll move down the stack to the edge — running compute on K3s alongside the sensors themselves, before the data ever hits a broker.