background-shape
Ingesting MQTT into TimescaleDB
August 15, 2022 · 5 min read · by Muhammad Amal programming

TL;DR — A small Go service subscribes to factory/+/+ MQTT topics, parses JSON, batches inserts to TimescaleDB hypertable with COPY. Single instance handles ~5000 msg/sec. Add hypertable retention policy + continuous aggregates for analytics.

After the edge side, the cloud side: receiving MQTT and persisting it. TimescaleDB (Postgres extension) is the de facto choice in 2022 for SQL-shaped time-series.

Schema

CREATE EXTENSION IF NOT EXISTS timescaledb;

CREATE TABLE sensor_readings (
  ts            timestamptz NOT NULL,
  device_id     text NOT NULL,
  metric        text NOT NULL,
  value         double precision NOT NULL,
  PRIMARY KEY (device_id, metric, ts)
);

SELECT create_hypertable('sensor_readings', 'ts', chunk_time_interval => interval '1 day');

CREATE INDEX ON sensor_readings (device_id, metric, ts DESC);

Hypertable = TimescaleDB’s auto-partitioned table. Chunks are daily; old chunks compress; queries hit only relevant chunks.

The primary key (device_id, metric, ts) makes upserts trivial. Same reading received twice = ON CONFLICT update.

The consumer service

package main

import (
    "context"
    "encoding/json"
    "log"
    "sync"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "github.com/jackc/pgx/v4/pgxpool"
)

type Reading struct {
    TS       time.Time `json:"ts"`
    Device   string    `json:"device"`
    Metric   string    `json:"metric"`
    Value    float64   `json:"value"`
}

type Ingester struct {
    db     *pgxpool.Pool
    buffer chan Reading
}

func (i *Ingester) Run(ctx context.Context) {
    batch := make([]Reading, 0, 500)
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            i.flush(batch)
            return
        case r := <-i.buffer:
            batch = append(batch, r)
            if len(batch) >= 500 {
                i.flush(batch)
                batch = batch[:0]
            }
        case <-ticker.C:
            if len(batch) > 0 {
                i.flush(batch)
                batch = batch[:0]
            }
        }
    }
}

func (i *Ingester) flush(batch []Reading) {
    if len(batch) == 0 { return }
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    rows := make([][]any, len(batch))
    for j, r := range batch {
        rows[j] = []any{r.TS, r.Device, r.Metric, r.Value}
    }
    _, err := i.db.CopyFrom(ctx,
        []string{"sensor_readings"},
        []string{"ts", "device_id", "metric", "value"},
        pgx.CopyFromRows(rows),
    )
    if err != nil {
        log.Printf("copy: %v", err)
    }
}

The consumer subscribes to MQTT, pushes parsed messages to a channel, the channel feeds a batch writer. Batches of 500 or 500ms, whichever first. CopyFrom is the fastest insert path in pgx.

MQTT subscribe:

func (i *Ingester) handleMessage(client mqtt.Client, msg mqtt.Message) {
    var raw map[string]any
    if err := json.Unmarshal(msg.Payload(), &raw); err != nil {
        return
    }
    // factory/<device>/<metric>
    parts := strings.Split(msg.Topic(), "/")
    if len(parts) < 3 { return }
    device := parts[1]
    metric := parts[2]
    value, ok := raw["value"].(float64)
    if !ok { return }
    ts, _ := time.Parse(time.RFC3339, raw["ts"].(string))
    select {
    case i.buffer <- Reading{TS: ts, Device: device, Metric: metric, Value: value}:
    default:
        log.Printf("buffer full, dropping message")
    }
}

If buffer is full, drop. For higher reliability, block instead — but then MQTT delivery backs up. Pick.

Throughput

Single Go consumer + single TimescaleDB instance, 4 vCPU, 16 GB RAM:

  • ~5000 msg/sec sustained
  • ~10K msg/sec burst
  • Latency from MQTT publish to DB: ~600ms (batching is the dominant component)

For 120 devices × 10 metrics × 1 Hz = 1200 msg/sec, this consumer is overkill. For 10K devices, you’d shard by topic prefix across multiple consumers.

Retention policy

SELECT add_retention_policy('sensor_readings', INTERVAL '90 days');

Chunks older than 90 days are dropped daily by background job. Storage stays bounded. For longer history at lower resolution, use continuous aggregates (next).

Continuous aggregates — downsampling

CREATE MATERIALIZED VIEW sensor_5min
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('5 minutes', ts) AS bucket,
  device_id,
  metric,
  avg(value) AS avg_value,
  min(value) AS min_value,
  max(value) AS max_value,
  count(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, device_id, metric;

SELECT add_continuous_aggregate_policy('sensor_5min',
  start_offset => INTERVAL '1 day',
  end_offset => INTERVAL '5 minutes',
  schedule_interval => INTERVAL '5 minutes');

TimescaleDB refreshes this view incrementally every 5 minutes. Queries against sensor_5min are fast even for year-long ranges.

Keep raw data 90 days; aggregated 5-min data 2 years; aggregated 1-hour data forever. Tiered retention.

Compression

For chunks older than 7 days:

ALTER TABLE sensor_readings SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'device_id, metric',
  timescaledb.compress_orderby = 'ts DESC'
);

SELECT add_compression_policy('sensor_readings', INTERVAL '7 days');

Old chunks compress automatically. Typical savings: 90% storage reduction. Queries on compressed chunks are slower but still acceptable.

Querying

Recent device state:

SELECT * FROM sensor_readings
WHERE device_id = 'press-42' AND metric = 'temperature'
  AND ts > now() - interval '1 hour'
ORDER BY ts DESC
LIMIT 100;

Average over a window using the continuous aggregate:

SELECT
  bucket,
  avg_value
FROM sensor_5min
WHERE device_id = 'press-42' AND metric = 'temperature'
  AND bucket > now() - interval '7 days'
ORDER BY bucket;

Fast because sensor_5min is materialized.

Aggregation across all devices:

SELECT
  time_bucket('1 hour', ts) AS bucket,
  metric,
  avg(value) AS avg_value
FROM sensor_readings
WHERE metric IN ('temperature', 'pressure')
  AND ts > now() - interval '24 hours'
GROUP BY bucket, metric
ORDER BY bucket;

time_bucket is the workhorse function — group by arbitrary time intervals.

Common Pitfalls

Insert one row at a time. ~100x slower. Always batch with COPY.

Hypertable with chunks too small. Daily chunks for 1 Hz data are fine. Hourly chunks = 24x more chunks = chunk planning overhead.

No retention policy. Disk fills.

Compression without continuous aggregates. Querying compressed data is slow; aggregates are fast. Use both.

Index on every column. Indexes consume space and write throughput. Index for the queries you actually run.

Forgetting to read the docs on continuous aggregate refresh policies. Stale aggregates surprise. Tune offsets to your query patterns.

Wrapping Up

MQTT consumer + TimescaleDB hypertable + COPY batching + retention + continuous aggregates. ~600 lines of Go total for a robust ingest service. Wednesday: Sparkplug B — industrial OT’s MQTT layer.