background-shape
Industrial ai article cover illustration on a gradient background
April 28, 2026 · 10 min read · by Muhammad Amal programming
Advertisement

TL;DR — An end-to-end industrial AI pipeline is four boxes, and every junction between them is a place data dies / Camera, broker, consumer, dashboard — each owns one job / Wire them with backpressure and observability and the whole thing stays honest under load.

I’ve built variations of this pipeline three times for different factory floors, and the lesson is always the same: nobody fails at the model. The model trains, the model infers, the demo looks great. Deployments fail at the seams — the camera-to-broker handoff, the broker-to-consumer flow, the consumer-to-storage write. Each seam is a place where a network blip, a slow disk, or a schema drift quietly drops data, and you don’t find out until an operator asks why the dashboard went flat for an hour.

This post assembles a complete pipeline and treats the seams as first-class. A Jetson Orin runs vision inference. EMQX 5.8 is the transport. A Go 1.24 consumer ingests, deduplicates, and writes time-series data. Grafana 11 turns it into an operations dashboard. The earlier posts covered the pieces — edge vision to MQTT integration for the camera side, MQTT cluster optimization for the broker. Here we wire them into one system and make it observable.

Advertisement

The Architecture, One Page

Four components, three seams. Keep this picture in your head:

 [Jetson Orin]        [EMQX 5.8]         [Go consumer]      [Grafana 11]
  YOLO inference  -->  MQTT 5.0 broker --> ingest + dedup --> dashboard
  + Go bridge          QoS 1, durable     + TSDB writer       + alerting
       |                    |                   |                  |
   seam 1: buffer       seam 2: shared      seam 3: idempotent   reads TSDB
   across outage        subscription        batched writes

Each component owns exactly one responsibility. The Jetson infers and publishes. The broker transports and persists in flight. The consumer makes ingestion idempotent and durable. Grafana visualizes and alerts. When something breaks, this separation tells you immediately which box to look in.

The data contract is the DetectionEvent schema from the integration post — versioned, with an event_id for deduplication and two timestamps for latency measurement. Every component agrees on that schema, and that agreement is what holds the system together.

Seam 1, Camera to Broker

The Jetson runs YOLO inference and a Go bridge process. The bridge owns the MQTT connection, buffers detections to disk during an outage, and publishes at QoS 1. That’s covered in detail in the edge vision integration post , so here I’ll only restate the contract: the bridge guarantees at-least-once delivery of every detection produced, even across a power loss, as long as the disk survives.

What matters for the end-to-end view is what the bridge publishes onto:

vision/jawa-barat/site-12/cam-north-3/detection   # QoS 1
vision/jawa-barat/site-12/cam-north-3/status      # retained LWT

The consumer never talks to the camera. It talks to the broker. That decoupling is the whole point — you can restart the consumer, redeploy it, or run three of them, and the camera neither knows nor cares.

Seam 2, Broker to Consumer

The consumer subscribes with an MQTT 5.0 shared subscription so multiple consumer instances load-balance the detection stream:

$share/vision-ingest/vision/+/+/+/detection

Run at least two consumer instances. With a shared subscription the broker round-robins detections across them, so a consumer restart for a deploy doesn’t pause ingestion — the surviving instance picks up the slack. This is the consumer-side mirror of the broker fault tolerance from cluster optimization .

Here’s the consumer’s MQTT side in Go 1.24, including the manual-ack pattern that’s the key to not losing data:

// consumer.go
package main

import (
	"context"
	"encoding/json"
	"log/slog"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

type Consumer struct {
	client mqtt.Client
	sink   *Sink // batched TSDB writer
	log    *slog.Logger
}

func NewConsumer(brokerURL, clientID string, sink *Sink,
	log *slog.Logger) (*Consumer, error) {

	c := &Consumer{sink: sink, log: log}

	opts := mqtt.NewClientOptions().
		AddBroker(brokerURL).
		SetClientID(clientID).
		SetCleanSession(false). // durable: keep the subscription
		SetKeepAlive(30 * time.Second).
		SetAutoReconnect(true).
		SetMaxReconnectInterval(60 * time.Second).
		SetOrderMatters(false).      // allow concurrent handler dispatch
		SetManualACK(true)           // we ack only after a durable write

	opts.SetOnConnectHandler(func(cl mqtt.Client) {
		tok := cl.Subscribe(
			"$share/vision-ingest/vision/+/+/+/detection",
			1, c.onMessage)
		tok.Wait()
		if err := tok.Error(); err != nil {
			log.Error("subscribe failed", "err", err)
		}
	})

	c.client = mqtt.NewClient(opts)
	tok := c.client.Connect()
	tok.Wait()
	return c, tok.Error()
}

func (c *Consumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
	var ev DetectionEvent
	if err := json.Unmarshal(msg.Payload(), &ev); err != nil {
		// Malformed payload: ack and drop. Re-delivering it
		// will never succeed and would block the queue.
		c.log.Error("drop malformed message",
			"topic", msg.Topic(), "err", err)
		msg.Ack()
		return
	}
	if ev.SchemaVersion != "detection.v1" {
		c.log.Warn("unknown schema, dropping", "v", ev.SchemaVersion)
		msg.Ack()
		return
	}
	// Hand to the sink; ack only once it is durably accepted.
	if err := c.sink.Enqueue(context.Background(), &ev); err != nil {
		c.log.Warn("sink enqueue failed, not acking",
			"event_id", ev.EventID, "err", err)
		return // no Ack — broker will redeliver
	}
	msg.Ack()
}

SetManualACK(true) is the decision that makes this pipeline reliable. By default the Paho client acks a QoS 1 message the moment the handler returns. If the process crashes between handler-return and the data hitting storage, the message is lost — the broker thinks it was delivered. With manual ack, we call msg.Ack() only after the sink has durably accepted the event. Crash before that, and the broker redelivers. Combined with event_id deduplication, this is exactly-once processing built on at-least-once delivery.

Note the two drop-and-ack paths: a malformed payload and an unknown schema version. Both are messages that will never succeed. Refusing to ack them would wedge the queue forever — a poison message that redelivers in an infinite loop. Ack, log loudly, move on.

Seam 3, Consumer to Storage

The sink batches writes to a time-series database. I use a TSDB with a line-protocol write API; the principle holds for any of them. Batching is non-negotiable: one network write per detection will not keep up, and it makes the database the bottleneck.

// sink.go — idempotent batched time-series writer
package main

import (
	"context"
	"fmt"
	"log/slog"
	"sync"
	"time"
)

type Sink struct {
	mu        sync.Mutex
	batch     []*DetectionEvent
	seen      map[string]time.Time // event_id -> first-seen, dedup
	batchSize int
	writeFn   func(context.Context, []*DetectionEvent) error
	log       *slog.Logger
}

func NewSink(batchSize int,
	writeFn func(context.Context, []*DetectionEvent) error,
	log *slog.Logger) *Sink {
	return &Sink{
		batch:     make([]*DetectionEvent, 0, batchSize),
		seen:      make(map[string]time.Time),
		batchSize: batchSize, writeFn: writeFn, log: log,
	}
}

// Enqueue dedups, buffers, and flushes when the batch is full.
// Returns nil only when the event is durably accepted (buffered
// in a batch that will flush, or already seen).
func (s *Sink) Enqueue(ctx context.Context, ev *DetectionEvent) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	if _, dup := s.seen[ev.EventID]; dup {
		return nil // already processed; safe to ack upstream
	}
	s.seen[ev.EventID] = time.Now()
	s.batch = append(s.batch, ev)

	if len(s.batch) >= s.batchSize {
		return s.flushLocked(ctx)
	}
	return nil
}

func (s *Sink) flushLocked(ctx context.Context) error {
	if len(s.batch) == 0 {
		return nil
	}
	if err := s.writeFn(ctx, s.batch); err != nil {
		return fmt.Errorf("tsdb write of %d events: %w",
			len(s.batch), err)
	}
	s.log.Info("flushed batch", "count", len(s.batch))
	s.batch = s.batch[:0]
	return nil
}

// FlushLoop forces a flush on an interval so a partial batch
// is never stuck waiting for batchSize to fill.
func (s *Sink) FlushLoop(ctx context.Context, every time.Duration) {
	t := time.NewTicker(every)
	defer t.Stop()
	for {
		select {
		case <-ctx.Done():
			s.mu.Lock()
			_ = s.flushLocked(context.Background())
			s.mu.Unlock()
			return
		case <-t.C:
			s.mu.Lock()
			if err := s.flushLocked(ctx); err != nil {
				s.log.Error("interval flush failed", "err", err)
			}
			s.mu.Unlock()
		}
	}
}

// GC trims the dedup set so it does not grow forever.
func (s *Sink) GC(ctx context.Context, ttl time.Duration) {
	t := time.NewTicker(ttl / 2)
	defer t.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-t.C:
			s.mu.Lock()
			cutoff := time.Now().Add(-ttl)
			for id, seen := range s.seen {
				if seen.Before(cutoff) {
					delete(s.seen, id)
				}
			}
			s.mu.Unlock()
		}
	}
}

Three things here earn their place. The seen map is the deduplication layer — it absorbs QoS 1 redelivery and buffer-drain repeats so the TSDB never gets a double-counted detection. The FlushLoop guarantees a partial batch flushes on a timer; without it, a low-traffic camera’s last few detections sit unwritten indefinitely. And GC bounds the seen map — an unbounded dedup set is a slow memory leak that crashes the consumer after a week. The TTL should comfortably exceed the broker’s message_retention_period so a redelivered message is still recognized as a duplicate.

One honest caveat: returning nil from Enqueue when the event is merely buffered (not yet flushed) means a consumer crash with a non-full batch loses those buffered events even though we acked them upstream. For most monitoring workloads a small batchSize (say 50) and a short flush interval (2 seconds) makes that window tiny and acceptable. If you cannot tolerate it at all, write to a local WAL before acking — the same pattern as the edge bridge’s disk buffer.

The Grafana Dashboard

Grafana 11 reads the TSDB. The dashboard should answer operator questions, not show off the model. Provision it as code so it’s reproducible:

# grafana/provisioning/dashboards/industrial-ai.yaml
apiVersion: 1
providers:
  - name: industrial-ai
    orgId: 1
    folder: "Factory Floor"
    type: file
    disableDeletion: false
    updateIntervalSeconds: 30
    options:
      path: /var/lib/grafana/dashboards

The panels that actually matter on a factory floor:

  • Detections per minute, per camera. A flat line means a dead camera or a dead seam. This is the first thing an operator should see.
  • Inference latency. inferred_at minus captured_at, p50 and p99. A rising p99 means the Jetson is thermally throttling or overloaded.
  • End-to-end lag. Ingest wall-clock time minus captured_at. This catches a slow seam anywhere in the chain.
  • Defect class breakdown. A stacked count per detection class — the panel the quality team actually uses.
  • Camera online status. Driven by the retained LWT topic; red the instant a camera disconnects.

Wire an alert on the detections-per-minute panel. If any camera reports zero detections for five minutes during a shift, page someone. That single alert catches most seam failures faster than any log.

# Grafana alert rule (conceptual)
WHEN  detections_per_minute{camera=~".+"} == 0
FOR   5m
DURING shift hours
THEN  notify ops-oncall

Common Pitfalls

  • Auto-ack on the consumer. The default acks before the data is durable; a crash loses the message. Use SetManualACK(true) and ack after the durable write.
  • No poison-message handling. A malformed payload that’s never acked redelivers forever and wedges the queue. Ack-and-drop with a loud log.
  • Per-event TSDB writes. Makes the database the bottleneck. Batch, and flush on a timer for partial batches.
  • Unbounded dedup set. The seen map grows until the consumer OOMs. GC it on a TTL longer than the broker retention.
  • One consumer instance. A deploy or crash pauses all ingestion. Run two behind a shared subscription.
  • A dashboard that shows the model, not the operations. Operators need detections-per-minute and online status, not confidence histograms.

Troubleshooting

Symptom: Dashboard goes flat for one camera; others are fine. Cause: Camera offline, or its bridge can’t reach the broker. Fix: Check the retained status topic and the bridge logs on that Jetson; seam 1 is the suspect.

Symptom: Dashboard flat for all cameras at once. Cause: Broker down, consumer down, or TSDB write failures. Fix: Check emqx ctl cluster status, then consumer logs for sink errors; seam 2 or 3.

Symptom: Detection counts higher than the cameras actually produced. Cause: Dedup not working — seen map GC’d too aggressively or event_id not stable. Fix: Confirm event_id is generated once at the camera and the GC TTL exceeds broker retention.

Symptom: End-to-end lag climbs steadily under load. Cause: TSDB writes can’t keep up; batches back up in the consumer. Fix: Increase batchSize, add a consumer instance, or check TSDB disk I/O.

Symptom: A single bad message stalls the whole consumer. Cause: A poison message that’s never acked redelivers in a loop. Fix: Ack-and-drop malformed payloads and unknown schema versions.

Wrapping Up

An end-to-end industrial AI pipeline is only as reliable as its weakest seam, and the seams are where the engineering actually is — buffering at the camera, shared subscriptions and durability at the broker, manual ack and idempotent batched writes at the consumer. Build each box to own one job, make the whole chain observable in Grafana, and alert on detections-per-minute so a dead seam pages you before an operator notices.

The Grafana provisioning documentation covers dashboard-as-code and alerting setup in full.

Advertisement