background-shape
Reading Sensors from Go on the Edge
August 12, 2022 · 5 min read · by Muhammad Amal programming

TL;DR — Go is a good fit for edge gateways: single binary, cross-compile to ARM, ~10MB at rest. Read sensors via periph.io (GPIO, I2C, SPI) or serial port. Publish to MQTT with auto-reconnect. Buffer locally on disconnect.

After data formats, the actual edge code. The Raspberry Pi (or industrial PC) running near sensors. Go’s small footprint and easy cross-compile make it a natural choice.

The shape of an edge service

[Sensor (BME280 via I2C)]
[Read loop, 1 Hz]
[Local buffer (in-memory or BoltDB)]
[MQTT publisher with auto-reconnect]
[Broker (Mosquitto)]

When connected: publish immediately. When disconnected: buffer locally. On reconnect: drain buffer.

Cross-compile to ARM

GOOS=linux GOARCH=arm GOARM=7 go build -o sensor-reader ./cmd/reader

# For 64-bit ARM (Pi 4 64-bit OS)
GOOS=linux GOARCH=arm64 go build -o sensor-reader ./cmd/reader

Single static binary. Copy to the Pi, run. No dependencies to install on target.

For static linking with CGO (some sensor libraries need it):

CGO_ENABLED=1 GOOS=linux GOARCH=arm GOARM=7 \
  CC=arm-linux-gnueabihf-gcc \
  go build -o sensor-reader ./cmd/reader

Or build inside a Pi-targeting container.

Reading an I2C sensor (BME280)

The BME280 is a common temperature/humidity/pressure sensor. Connected via I2C.

package main

import (
    "context"
    "log"
    "time"

    "periph.io/x/conn/v3/i2c/i2creg"
    "periph.io/x/devices/v3/bmxx80"
    "periph.io/x/host/v3"
)

func main() {
    if _, err := host.Init(); err != nil {
        log.Fatal(err)
    }

    bus, err := i2creg.Open("")
    if err != nil {
        log.Fatal(err)
    }
    defer bus.Close()

    dev, err := bmxx80.NewI2C(bus, 0x76, &bmxx80.DefaultOpts)
    if err != nil {
        log.Fatal(err)
    }
    defer dev.Halt()

    ctx := context.Background()
    tick := time.NewTicker(time.Second)
    defer tick.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-tick.C:
            var env bmxx80.Environment
            if err := dev.Sense(&env); err != nil {
                log.Printf("sense: %v", err)
                continue
            }
            log.Printf("T=%v°C P=%vPa H=%v%%",
                env.Temperature.Celsius(),
                env.Pressure,
                env.Humidity)
        }
    }
}

periph.io is the de facto Go peripheral library. Supports GPIO, I2C, SPI, 1-wire. Pure Go where possible, CGO for some.

Publishing to MQTT with auto-reconnect

import mqtt "github.com/eclipse/paho.mqtt.golang"

func newMQTTClient(broker, user, pass, clientID string) mqtt.Client {
    opts := mqtt.NewClientOptions().
        AddBroker(broker).
        SetUsername(user).
        SetPassword(pass).
        SetClientID(clientID).
        SetAutoReconnect(true).
        SetConnectRetry(true).
        SetConnectRetryInterval(5 * time.Second).
        SetMaxReconnectInterval(30 * time.Second).
        SetKeepAlive(30 * time.Second).
        SetCleanSession(false).
        SetWill(fmt.Sprintf("factory/%s/status", clientID), "offline", 1, true)

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf("connect: %v", token.Error())
    }
    return client
}

Configuration that matters:

  • SetAutoReconnect(true) — reconnect on disconnect (default true; explicit for clarity)
  • SetCleanSession(false) — broker queues messages for us while disconnected (for QoS 1+)
  • SetWill(...) — broker publishes “offline” if we die ungracefully
  • SetConnectRetry(true) — retry initial connect (default false; useful at boot when broker may be slow)

Buffering on disconnect

For QoS 1/2 with CleanSession=false, the Paho client buffers in memory. Limited capacity; lost on process restart.

For larger buffers or restart durability, use BoltDB:

import bolt "go.etcd.io/bbolt"

type Buffer struct {
    db *bolt.DB
}

func NewBuffer(path string) (*Buffer, error) {
    db, err := bolt.Open(path, 0600, nil)
    if err != nil { return nil, err }
    err = db.Update(func(tx *bolt.Tx) error {
        _, err := tx.CreateBucketIfNotExists([]byte("pending"))
        return err
    })
    return &Buffer{db: db}, err
}

func (b *Buffer) Add(topic string, payload []byte) error {
    return b.db.Update(func(tx *bolt.Tx) error {
        bkt := tx.Bucket([]byte("pending"))
        id, _ := bkt.NextSequence()
        key := fmt.Sprintf("%020d", id)
        return bkt.Put([]byte(key), encode(topic, payload))
    })
}

func (b *Buffer) Drain(client mqtt.Client) {
    b.db.Update(func(tx *bolt.Tx) error {
        bkt := tx.Bucket([]byte("pending"))
        c := bkt.Cursor()
        for k, v := c.First(); k != nil; k, v = c.Next() {
            topic, payload := decode(v)
            token := client.Publish(topic, 1, false, payload)
            if !token.WaitTimeout(5*time.Second) {
                return fmt.Errorf("publish timeout")
            }
            bkt.Delete(k)
        }
        return nil
    })
}

On disconnect, write to BoltDB. On reconnect, drain in order.

For factories where 10-second blips happen daily, this saves real data. Without it, you lose ~10 readings per blip.

Sensor → MQTT loop

func runLoop(ctx context.Context, dev *bmxx80.Dev, client mqtt.Client, buf *Buffer) error {
    tick := time.NewTicker(time.Second)
    defer tick.Stop()

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-tick.C:
            var env bmxx80.Environment
            if err := dev.Sense(&env); err != nil {
                log.Printf("sense: %v", err)
                continue
            }

            payload, _ := json.Marshal(map[string]any{
                "ts":          time.Now().UTC().Format(time.RFC3339),
                "device":      clientID,
                "temperature": env.Temperature.Celsius(),
                "pressure":    float64(env.Pressure) / 1000.0,  // kPa
                "humidity":    float64(env.Humidity) / 100.0,
            })

            topic := fmt.Sprintf("factory/%s/env", clientID)
            if !client.IsConnected() {
                buf.Add(topic, payload)
                continue
            }

            token := client.Publish(topic, 1, false, payload)
            if !token.WaitTimeout(2*time.Second) {
                buf.Add(topic, payload)
            }
        }
    }
}

Read; format; if connected, publish; otherwise buffer.

Resource footprint

For the BME280 + MQTT setup on a Pi 4:

  • Binary size: ~12 MB
  • RSS at runtime: ~14 MB
  • CPU at 1 Hz sensor read: ~0.1%
  • Cold start: <300 ms

Plenty of headroom on a Pi for tens of similar services running concurrently. Compare to Python equivalents (40-80 MB RSS) or Node (60-100 MB).

Common Pitfalls

Forgetting CleanSession=false. Disconnect = subscriber missed messages. Persistent session preserves them broker-side.

Same client ID on multiple devices. Broker disconnects the first when the second connects.

No will message. Subscribers don’t know device went offline.

Read loop too tight. 1 kHz sensor reads at 1 kHz publish rate floods the broker. Aggregate locally; publish at 1-10 Hz.

Buffer with no size limit. Disconnected device fills disk. Cap the buffer; alert on full.

Long-running sensor reads blocking the publisher. If the sensor sometimes hangs, the loop stalls. Read with timeout.

Wrapping Up

Go on the edge: single ARM binary, periph.io for sensors, Paho MQTT with auto-reconnect, BoltDB for buffering. Field-tested pattern. Monday: ingesting MQTT into TimescaleDB — the cloud side.