background-shape
From MQTT to InfluxDB 2.7, A Telegraf Pipeline That Won't Drop Data
August 12, 2023 · 6 min read · by Muhammad Amal programming

TL;DR — Telegraf is the right Swiss-army-knife between MQTT and InfluxDB, but its defaults are tuned for metrics scraping, not high-rate sensor ingest / Tune metric_batch_size, metric_buffer_limit, and shared subscriptions / Line protocol design (tags vs fields) is what dictates query performance two months later.

I’ve shipped this pipeline — MQTT broker into Telegraf into InfluxDB — at five plants in the last 18 months. It works. It’s also the part of the stack I see the most “it was fine in staging and now it’s losing data” incidents around, almost always because someone copy-pasted the README config and called it production.

This post is the working configuration I trust, with the tuning that took an embarrassing number of incidents to converge on. Assumes you’ve already got an MQTT broker — see my EMQX clustering post for the upstream side.

Architecture in One Diagram, Words Only

[Sensors] --MQTT 5--> [Broker cluster]
                          |
                          | $share/ingest/plant/+/data/#
                          v
                  [Telegraf 1.27 x N]
                          |
                          | influxdb_v2 output, batched
                          v
                  [InfluxDB 2.7 cluster]
                          |
                          | Flux / SQL queries
                          v
                  [Grafana 10 / apps]

Telegraf scales horizontally. We use shared subscriptions on the broker so multiple Telegraf instances pull from the same topic without duplicating work. Each Telegraf is stateless aside from its in-memory buffer.

The Telegraf Config I Actually Ship

# /etc/telegraf/telegraf.conf — Telegraf 1.27.x

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 5000
  metric_buffer_limit = 500000
  collection_jitter = "0s"
  flush_interval = "5s"
  flush_jitter = "1s"
  precision = "1ms"
  omit_hostname = true
  logfile = ""
  debug = false
  quiet = false

[[inputs.mqtt_consumer]]
  servers = ["tcp://broker.iot.svc:1883"]
  topics = [
    "$share/telegraf/plant/+/data/#",
    "$share/telegraf/plant/+/status/#",
  ]
  qos = 1
  connection_timeout = "30s"
  max_undelivered_messages = 10000
  persistent_session = true
  client_id = "telegraf-${HOSTNAME}"
  username = "telegraf"
  password = "${MQTT_PASSWORD}"
  data_format = "json_v2"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "plant/+/data/+"
    tags  = "_/plant/_/measurement"

  [[inputs.mqtt_consumer.json_v2]]
    measurement_name_path = "measurement"
    timestamp_path = "ts"
    timestamp_format = "unix_ms"

    [[inputs.mqtt_consumer.json_v2.object]]
      path = "fields"
      disable_prepend_keys = true

    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "device_id"
    [[inputs.mqtt_consumer.json_v2.tag]]
      path = "site"

[[outputs.influxdb_v2]]
  urls = ["https://influxdb-0.iot.svc:8086", "https://influxdb-1.iot.svc:8086"]
  token = "${INFLUX_TOKEN}"
  organization = "industrial"
  bucket = "telemetry"
  timeout = "10s"
  user_agent = "telegraf-iot/1.27"
  content_encoding = "gzip"
  insecure_skip_verify = false

A few non-obvious choices:

  • $share/telegraf/... topics. Without shared subscriptions every Telegraf instance receives every message and we deduplicate at InfluxDB — which we will not be doing, because InfluxDB doesn’t deduplicate by default and you’ll be filing tickets in a week.
  • metric_batch_size = 5000, metric_buffer_limit = 500000. The default 1000/10000 is sized for inputs.cpu. For 30k msg/sec MQTT ingest, those defaults will drop data the moment InfluxDB hiccups.
  • persistent_session = true with a stable client_id. If Telegraf restarts, the broker holds messages for it. If you skip this you’ll lose every message in flight during a Telegraf rolling update.
  • precision = "1ms". Most sensors emit at millisecond resolution. Defaulting to seconds will collapse multiple readings into one point and you’ll lose data silently.

Line Protocol Design Is the Schema

This is the part everyone gets wrong on the first deployment. In InfluxDB, tags are indexed, fields are not. The cardinality of your tags determines how the storage engine performs. Get it right and queries take milliseconds. Get it wrong and you’re rewriting the ingest in three months.

Bad line protocol:

temperature,id=device-7c-9f-23-aa-11-04,site=plant-east,line=L1,zone=cold,vendor=acme value=21.4 1691740800000

That’s six tags. id alone has 50k cardinality. Series cardinality is roughly the product of tag value counts — you’ve just told InfluxDB to maintain millions of series, and the TSI index is going to ache.

Better:

temperature,device=device-7c-9f-23-aa-11-04,site=plant-east value=21.4 1691740800000

Two tags. line, zone, and vendor become fields. You lose the ability to GROUP BY line for free, but you can still filter on it with a filter() expression in Flux, and your write amplification drops by half. The InfluxDB schema design guide is worth re-reading every six months.

The rule I use: a value can be a tag if you’ll routinely GROUP BY it, its cardinality is bounded by something physical (sites, lines, machine types), and it doesn’t change frequently. Everything else is a field.

InfluxDB 2.7 Bucket and Retention

Three buckets, not one:

# Hot bucket - 7 days, 1ms precision, raw telemetry
influx bucket create \
  --name telemetry \
  --retention 7d \
  --shard-group-duration 24h \
  --org industrial

# Warm bucket - 90 days, downsampled to 1-min aggregates
influx bucket create \
  --name telemetry_1m \
  --retention 90d \
  --shard-group-duration 7d \
  --org industrial

# Cold bucket - 2 years, downsampled to 1-hour aggregates
influx bucket create \
  --name telemetry_1h \
  --retention 730d \
  --shard-group-duration 30d \
  --org industrial

A Flux task does the rollup. Run it every minute, scoped to the previous window:

// 1m downsample task
option task = {name: "downsample-1m", every: 1m, offset: 30s}

from(bucket: "telemetry")
  |> range(start: -task.every, stop: now())
  |> filter(fn: (r) => r._measurement == "temperature"
                    or r._measurement == "vibration"
                    or r._measurement == "pressure")
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
  |> set(key: "_measurement", value: r._measurement)
  |> to(bucket: "telemetry_1m", org: "industrial")

Pin the offset to 30 seconds so the task doesn’t race against late-arriving points. If your network has more jitter than that, raise it — the cost is just a one-minute lag in the downsampled view.

Failure Modes Worth Drilling

Telegraf falls behind. The MQTT input’s max_undelivered_messages is your circuit breaker. When InfluxDB is slow, Telegraf will hold up to 10000 unacked messages and then stop reading from MQTT. The broker’s offline queue absorbs the rest — until it fills, at which point messages get dropped at the broker. That cascade is the design. Watch broker queue depth and Telegraf buffer fill, alert before either saturates.

InfluxDB token rotation. Tokens silently 401. Telegraf will log warnings and drop batches. Have a rotation runbook that updates Telegraf’s secret and triggers a SIGHUP, and alert on internal_write errors > 0 for any output.

Cardinality runaway. Someone adds a request_id or uuid tag and the TSI index doubles overnight. Set a Prometheus alert on InfluxDB’s storage_series_total per bucket, with a rate-of-change threshold.

Common Pitfalls

  • Treating inputs.mqtt_consumer like a Kafka consumer. It isn’t. There’s no offset commit story. If Telegraf crashes mid-batch and persistent_session = false, those messages are gone. Either use persistent sessions or accept the at-most-once semantics.
  • flush_interval longer than metric_buffer_limit / ingest_rate. If you ingest 10k/sec and your buffer is 500k, your flush has to happen at least every 50 seconds or you’ll start dropping. The math is unforgiving.
  • Gzip on small batches. content_encoding = "gzip" is a win for batches over ~5 KB, a wash or loss below that. With metric_batch_size = 5000 and ~80-byte points, you’re well into the win zone.
  • One Telegraf per broker. Run at least two and put them behind a shared subscription. Single Telegraf instances are single points of failure even when “the broker is highly available.”
  • Forgetting that precision truncates. Telegraf’s output precision truncates the timestamp. If your sensor sends at microsecond precision and Telegraf is at s, you get one point per second per series even if 100 arrived. Match precision to source.

Wrapping Up

Telegraf into InfluxDB is the closest thing IIoT has to a “boring” telemetry pipeline in 2023, and that’s a compliment. Spend the time on line-protocol schema and shared-subscription topology up front and the operational story is genuinely quiet. Next post I’ll compare InfluxDB and TimescaleDB for the sensor-storage workload, because at some point every team has the “do we move to Postgres” conversation.