Connecting Edge Vision Inference to an MQTT Telemetry Backbone
TL;DR — A vision model that infers is useless until its results reach a system that acts / The bridge between inference and MQTT is where most edge deployments quietly fail / A small Go 1.24 process with a buffer, a schema, and reconnect logic is the whole job done right.
An edge vision model on a Jetson Orin does one thing well: it turns frames into structured detections. A bounding box, a class, a confidence score, a timestamp. What it does not do is get those detections anywhere useful. That gap — between “the model produced a result” and “the telemetry backbone has the result” — is the integration nobody writes a blog post about, and it’s exactly where edge deployments fall over.
I’ve debugged this failure mode more than once. The camera works. The model works. The dashboard works. But the network at the factory edge is flaky, the broker occasionally restarts, and the Python inference loop publishes MQTT messages inline with client.publish() and no buffering. A 30-second network blip means 30 seconds of detections vanish into the void. The model saw the defect; nobody downstream ever found out.
The fix is a dedicated bridge: a small, single-purpose process that sits between the inference loop and the broker, owns the MQTT connection, buffers across outages, and enforces a schema. This post builds that bridge in Go 1.24, publishing to an EMQX 5.8 backbone. The broker side is covered in MQTT cluster optimization ; here we focus on the edge process that feeds it.
Why a Separate Bridge Process
The instinct is to publish MQTT directly from the inference code. Resist it. The inference loop has one hard real-time job — pull a frame, run the model, free the GPU buffer — and anything that blocks it drops frames. A publish() call that stalls because the network is down stalls the camera.
Separating the bridge gives you four things:
- The inference loop never blocks on the network. It hands a detection to the bridge over a local channel and moves on.
- Buffering lives in one place. The bridge holds an on-disk queue; an outage is absorbed, not lost.
- Reconnect logic lives in one place. MQTT reconnect with backoff is fiddly. Write it once, in the bridge.
- The schema is enforced at one boundary. Every message on the wire is validated before it leaves the edge.
The inference process (Python, because that’s where the model runtime lives) and the bridge (Go, because reliable concurrent I/O is what Go is for) talk over a local Unix domain socket. Cheap, fast, no network involved.
The Wire Schema
Define the message before writing any code. A loose schema is technical debt that detonates months later when a consumer can’t parse a field. Here’s the detection event, versioned in the topic and in the payload:
// detection.go — the wire contract, shared by encoder and tests
package bridge
type BBox struct {
X float32 `json:"x"`
Y float32 `json:"y"`
Width float32 `json:"w"`
Height float32 `json:"h"`
}
type Detection struct {
Class string `json:"class"`
Confidence float32 `json:"confidence"`
Box BBox `json:"box"`
}
type DetectionEvent struct {
SchemaVersion string `json:"schema_version"` // "detection.v1"
EventID string `json:"event_id"` // UUID, dedup key
DeviceID string `json:"device_id"`
CameraID string `json:"camera_id"`
CapturedAt string `json:"captured_at"` // RFC3339, frame time
InferredAt string `json:"inferred_at"` // RFC3339, post-model
ModelName string `json:"model_name"`
Detections []Detection `json:"detections"`
FrameWidth int `json:"frame_width"`
FrameHeight int `json:"frame_height"`
}
Two timestamps matter. captured_at is when the frame was grabbed; inferred_at is when the model finished. The gap between them is your inference latency, measurable downstream for free. event_id is the dedup key — the consumer uses it to deduplicate under MQTT QoS 1 redelivery, the pattern from QoS and persistence tuning
.
Topic Structure for Inference Results
Mirror the sensor topic tree so vision detections and sensor readings live in one coherent namespace:
vision/{region}/{site}/{camera_id}/detection
vision/jawa-barat/site-12/cam-north-3/detection
vision/jawa-barat/site-12/cam-north-3/status # LWT, retained
Keep camera ID at a fixed level. Consumers subscribe with a shared subscription so detection processing load-balances across worker instances:
$share/vision-ingest/vision/+/+/+/detection
The Go Bridge, End to End
The bridge has three parts: a local socket server receiving detections from the inference loop, an on-disk buffer, and an MQTT publisher with reconnect. Here’s the publisher core using paho.mqtt.golang v1.5.0.
// publisher.go
package bridge
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"log/slog"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type Publisher struct {
client mqtt.Client
topicBase string
deviceID string
log *slog.Logger
}
func NewPublisher(brokerURL, caPath, deviceID, topicBase string,
log *slog.Logger) (*Publisher, error) {
caPEM, err := os.ReadFile(caPath)
if err != nil {
return nil, fmt.Errorf("read ca: %w", err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return nil, fmt.Errorf("ca pem invalid")
}
p := &Publisher{topicBase: topicBase, deviceID: deviceID, log: log}
opts := mqtt.NewClientOptions().
AddBroker(brokerURL).
SetClientID("vision-bridge-" + deviceID).
SetTLSConfig(&tls.Config{RootCAs: pool, MinVersion: tls.VersionTLS12}).
SetCleanSession(false). // durable session
SetKeepAlive(30 * time.Second).
SetConnectRetry(true). // retry the initial connect
SetConnectRetryInterval(5 * time.Second).
SetAutoReconnect(true).
SetMaxReconnectInterval(60 * time.Second).
SetConnectionLostHandler(func(_ mqtt.Client, err error) {
log.Warn("mqtt connection lost", "err", err)
}).
SetOnConnectHandler(func(_ mqtt.Client) {
log.Info("mqtt connected", "broker", brokerURL)
})
// Last Will: announce this camera offline if we die ungracefully
willTopic := topicBase + "/status"
will, _ := json.Marshal(map[string]string{
"device_id": deviceID, "state": "offline",
})
opts.SetWill(willTopic, string(will), 1, true)
p.client = mqtt.NewClient(opts)
tok := p.client.Connect()
if !tok.WaitTimeout(30 * time.Second) {
return nil, fmt.Errorf("mqtt connect timeout")
}
if err := tok.Error(); err != nil {
return nil, fmt.Errorf("mqtt connect: %w", err)
}
return p, nil
}
// Publish returns an error if the broker did not acknowledge.
// The caller decides whether to re-buffer on failure.
func (p *Publisher) Publish(ev *DetectionEvent) error {
payload, err := json.Marshal(ev)
if err != nil {
return fmt.Errorf("marshal event %s: %w", ev.EventID, err)
}
topic := p.topicBase + "/detection"
tok := p.client.Publish(topic, 1, false, payload) // QoS 1
if !tok.WaitTimeout(10 * time.Second) {
return fmt.Errorf("publish timeout for %s", ev.EventID)
}
if err := tok.Error(); err != nil {
return fmt.Errorf("publish %s: %w", ev.EventID, err)
}
return nil
}
SetAutoReconnect(true) and SetConnectRetry(true) together mean the client recovers from both a mid-session drop and a broker that’s down at startup. But auto-reconnect alone doesn’t save the messages produced during the outage — that’s the buffer’s job.
The On-Disk Buffer
A memory-only buffer dies with the process. For an edge device that may lose power, the buffer has to be on disk. I use a simple append-only WAL with a checkpoint offset — bounded, durable, and easy to reason about.
// buffer.go — bounded on-disk queue for outage absorption
package bridge
import (
"bufio"
"encoding/json"
"fmt"
"os"
"sync"
)
type DiskBuffer struct {
mu sync.Mutex
path string
maxBytes int64
f *os.File
w *bufio.Writer
}
func NewDiskBuffer(path string, maxBytes int64) (*DiskBuffer, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o644)
if err != nil {
return nil, fmt.Errorf("open buffer: %w", err)
}
return &DiskBuffer{path: path, maxBytes: maxBytes,
f: f, w: bufio.NewWriter(f)}, nil
}
// Append writes an event as one JSON line. If the buffer exceeds
// maxBytes, the oldest events are dropped on the next compaction.
func (b *DiskBuffer) Append(ev *DetectionEvent) error {
b.mu.Lock()
defer b.mu.Unlock()
line, err := json.Marshal(ev)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if _, err := b.w.Write(append(line, '\n')); err != nil {
return fmt.Errorf("buffer write: %w", err)
}
if err := b.w.Flush(); err != nil {
return fmt.Errorf("buffer flush: %w", err)
}
// fsync so a power loss does not lose the just-written event
return b.f.Sync()
}
// Drain reads all buffered events and truncates the file.
func (b *DiskBuffer) Drain() ([]*DetectionEvent, error) {
b.mu.Lock()
defer b.mu.Unlock()
if _, err := b.f.Seek(0, 0); err != nil {
return nil, err
}
var out []*DetectionEvent
sc := bufio.NewScanner(b.f)
sc.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for sc.Scan() {
var ev DetectionEvent
if err := json.Unmarshal(sc.Bytes(), &ev); err != nil {
continue // skip a corrupt line rather than stall
}
out = append(out, &ev)
}
if err := sc.Err(); err != nil {
return out, fmt.Errorf("scan: %w", err)
}
if err := b.f.Truncate(0); err != nil {
return out, fmt.Errorf("truncate: %w", err)
}
_, err := b.f.Seek(0, 0)
return out, err
}
The fsync on every append is deliberate. A factory-floor Jetson loses power without warning, and a detection that lived only in the OS page cache is gone. The cost is real — fsync per event is not free — but for a vision pipeline producing a handful of detection events per second it’s well within budget. If your event rate is much higher, batch the fsync.
Wiring It Together
The run loop ties the socket receiver, buffer, and publisher with a clear policy: try to publish live; on failure, buffer; on reconnect, drain.
// runner.go
package bridge
import (
"context"
"log/slog"
"time"
)
type Runner struct {
pub *Publisher
buf *DiskBuffer
in <-chan *DetectionEvent // fed by the Unix socket receiver
log *slog.Logger
}
func (r *Runner) Run(ctx context.Context) error {
drainTicker := time.NewTicker(15 * time.Second)
defer drainTicker.Stop()
for {
select {
case <-ctx.Done():
r.log.Info("runner shutting down")
return ctx.Err()
case ev := <-r.in:
if err := r.pub.Publish(ev); err != nil {
r.log.Warn("publish failed, buffering",
"event_id", ev.EventID, "err", err)
if berr := r.buf.Append(ev); berr != nil {
r.log.Error("buffer append failed",
"event_id", ev.EventID, "err", berr)
}
}
case <-drainTicker.C:
events, err := r.buf.Drain()
if err != nil {
r.log.Error("buffer drain failed", "err", err)
continue
}
for _, ev := range events {
if err := r.pub.Publish(ev); err != nil {
// still down — put it back and stop draining
r.log.Warn("re-buffering on failed drain",
"event_id", ev.EventID)
_ = r.buf.Append(ev)
break
}
}
}
}
}
The drain policy is the subtle part. When a drain attempt fails, the runner re-buffers the failed event and stops the drain rather than hammering a broker that’s still down. The next ticker tick retries. This avoids a tight retry loop that burns CPU and disk during an extended outage.
The Python side is thin — it just pushes JSON over the socket:
# inference side — hand off to the bridge, never block on the network
import socket, json
def emit(event: dict, sock_path="/run/vision-bridge.sock"):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(sock_path)
s.sendall((json.dumps(event) + "\n").encode())
Common Pitfalls
- Publishing MQTT directly from the inference loop. A blocked
publish()stalls the camera. Always hand off to a separate bridge. - Memory-only buffering. A process restart or power loss loses every buffered detection. The buffer must be on disk with fsync.
- No schema version. The first time a consumer can’t parse a new field, you’ll wish you had
schema_versionin the payload and the topic. - Unbounded buffer. A multi-day outage fills the disk and crashes the Jetson. Cap the buffer and drop oldest on overflow.
- Tight retry loop during an outage. Re-buffer and back off; don’t spin on a broker that’s down.
SetCleanSession(true)on the bridge. Throws away the durable session, so QoS 1 inflight messages are lost on a broker restart.
Troubleshooting
Symptom: Camera drops frames whenever the network hiccups. Cause: MQTT publish is inline with the inference loop and blocks on the network. Fix: Move publishing into the separate bridge; the loop only writes to a local socket.
Symptom: Detections produced during an outage never arrive after reconnect.
Cause: Buffer is memory-only and was lost, or drain logic isn’t wired.
Fix: Use the on-disk DiskBuffer and confirm the drain ticker fires after reconnect.
Symptom: Bridge connects, then drops every 30-60 seconds.
Cause: Keepalive shorter than network round-trip, or the broker reaping the session.
Fix: Align SetKeepAlive with broker keepalive_multiplier; check broker logs for the disconnect reason.
Symptom: Consumer sees duplicate detection events.
Cause: Expected under QoS 1 redelivery, amplified by buffer drain re-publishing.
Fix: Deduplicate downstream on event_id.
Symptom: Jetson disk fills up over a long outage.
Cause: Unbounded buffer growth.
Fix: Enforce maxBytes and compact, dropping the oldest events.
What’s Next
The bridge is the unglamorous component that decides whether an edge vision deployment is reliable or merely demo-ready. Keep it small, keep the buffer on disk, and keep the schema versioned. With inference results flowing onto the backbone, the last step is closing the loop into a dashboard — see the end-to-end industrial AI pipeline .
The Eclipse Paho Go client documentation covers the full set of connection and reconnect options used here.