Bridging OPC UA and Modbus to MQTT in Go, A Step by Step Guide
TL;DR — Industrial gear speaks OPC UA or Modbus, not MQTT. The bridge in this post reads from both, normalizes the values, and publishes to MQTT 5 with proper user properties. OPC UA subscriptions for fresh data, Modbus polling for old PLCs, and a strict contract on the published topic shape. Go 1.24 with
gopcuaandgoburrow/modbusdoes the job in about 600 lines.
If you’ve worked with industrial customers, you’ve met the protocols. OPC UA is the modern one, encrypted and structured, supported by basically every PLC vendor since 2015. Modbus TCP is the older one, plain TCP and bytes, supported by everything ever made. Neither is MQTT. To get sensor data into your modern stack (Kafka, time-series DB, dashboards), you need a bridge.
This post walks through building that bridge in Go 1.24. We’ll connect to an OPC UA server, set up subscriptions, poll a Modbus device, normalize both into a common schema, and publish to MQTT 5. The code targets a small edge gateway (Raspberry Pi or industrial PC), but runs anywhere.
If you read my earlier post on EMQX clustering, this is the producer side. The bridge talks to the brokers we built there.
1. The bridge architecture
Two source goroutines (one OPC UA, one Modbus), one normalization channel, one MQTT publisher goroutine. Standard shape.
+-----------+ +-----------+
| OPC UA | | Modbus |
| client | | client |
| (sub) | | (poll) |
+-----+-----+ +-----+-----+
| |
v v
+-----------------------+
| normalize chan |
| (cap=1024) |
+----------+------------+
|
v
+-------------+
| MQTT pub |
| (paho v5) |
+-------------+
|
v
emqx://broker
The normalize channel is the seam. Anything upstream can be added or replaced (BACnet, EtherNet/IP) without changing the publisher. The publisher knows nothing about industrial protocols.
1.1 The common schema
Every reading the bridge emits looks like this:
type Reading struct {
Source string `json:"source"` // "opcua" or "modbus"
DeviceID string `json:"device_id"` // PLC identifier
Tag string `json:"tag"` // logical name
Value float64 `json:"value"`
Quality string `json:"quality"` // "good", "bad", "uncertain"
Timestamp time.Time `json:"timestamp"`
Unit string `json:"unit,omitempty"`
}
The topic shape is site/{site_id}/device/{device_id}/tag/{tag}. Always. Pin this contract early, downstream consumers will lock to it.
2. OPC UA, the modern half
github.com/gopcua/opcua is the maintained Go OPC UA client. v0.7.x (April 2025) supports the security profiles you need.
2.1 Connection with security
// internal/source/opcua.go
package source
import (
"context"
"log/slog"
"time"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
)
type OPCUAConfig struct {
Endpoint string
SecurityMode string // "None", "Sign", "SignAndEncrypt"
SecurityPol string // "None", "Basic256Sha256"
CertFile string
KeyFile string
Username string
Password string
}
type OPCUA struct {
Cfg OPCUAConfig
DeviceID string
Tags []OPCUATag
Out chan<- Reading
}
type OPCUATag struct {
NodeID string
Name string
Unit string
}
func (o *OPCUA) Run(ctx context.Context) error {
opts := []opcua.Option{
opcua.SecurityMode(securityMode(o.Cfg.SecurityMode)),
opcua.SecurityPolicy(o.Cfg.SecurityPol),
opcua.SessionTimeout(60 * time.Second),
opcua.AutoReconnect(true),
opcua.ReconnectInterval(2 * time.Second),
}
if o.Cfg.CertFile != "" {
opts = append(opts,
opcua.CertificateFile(o.Cfg.CertFile),
opcua.PrivateKeyFile(o.Cfg.KeyFile),
)
}
if o.Cfg.Username != "" {
opts = append(opts, opcua.AuthUsername(o.Cfg.Username, o.Cfg.Password))
}
c, err := opcua.NewClient(o.Cfg.Endpoint, opts...)
if err != nil { return err }
if err := c.Connect(ctx); err != nil { return err }
defer c.Close(ctx)
slog.Info("opcua connected", "endpoint", o.Cfg.Endpoint)
return o.subscribe(ctx, c)
}
func securityMode(s string) ua.MessageSecurityMode {
switch s {
case "Sign": return ua.MessageSecurityModeSign
case "SignAndEncrypt": return ua.MessageSecurityModeSignAndEncrypt
default: return ua.MessageSecurityModeNone
}
}
A note on certificates. OPC UA mutual TLS uses self-signed certs on both sides. The first time your client connects to a server, the server admin has to approve the client’s cert (and vice versa). This is a deployment step, not a code step.
2.2 Subscriptions
OPC UA subscriptions are way better than polling. The server publishes only when a value changes, with a configurable deadband.
func (o *OPCUA) subscribe(ctx context.Context, c *opcua.Client) error {
notifyCh := make(chan *opcua.PublishNotificationData, 16)
sub, err := c.SubscribeWithContext(ctx, &opcua.SubscriptionParameters{
Interval: 250 * time.Millisecond,
}, notifyCh)
if err != nil { return err }
defer sub.Cancel(ctx)
// Add monitored items
nodes := make([]*ua.MonitoredItemCreateRequest, 0, len(o.Tags))
tagByHandle := make(map[uint32]OPCUATag, len(o.Tags))
for i, t := range o.Tags {
nid, err := ua.ParseNodeID(t.NodeID)
if err != nil { return err }
handle := uint32(i + 1)
req := opcua.NewMonitoredItemCreateRequestWithDefaults(
nid, ua.AttributeIDValue, handle,
)
nodes = append(nodes, req)
tagByHandle[handle] = t
}
if _, err := sub.Monitor(ctx, ua.TimestampsToReturnBoth, nodes...); err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case n := <-notifyCh:
if n.Error != nil {
slog.Warn("opcua notification", "err", n.Error)
continue
}
switch v := n.Value.(type) {
case *ua.DataChangeNotification:
for _, item := range v.MonitoredItems {
tag, ok := tagByHandle[item.ClientHandle]
if !ok { continue }
o.emit(item, tag)
}
}
}
}
}
func (o *OPCUA) emit(item *ua.MonitoredItemNotification, tag OPCUATag) {
if item.Value == nil { return }
val, ok := item.Value.Value.Value().(float64)
if !ok {
// try other numeric types
if iv, ok2 := item.Value.Value.Value().(int32); ok2 {
val = float64(iv)
} else {
return
}
}
quality := "good"
if item.Value.Status != ua.StatusOK {
quality = "bad"
}
r := Reading{
Source: "opcua",
DeviceID: o.DeviceID,
Tag: tag.Name,
Value: val,
Quality: quality,
Timestamp: item.Value.SourceTimestamp,
Unit: tag.Unit,
}
select {
case o.Out <- r:
default:
// backpressure: drop
}
}
Interval: 250ms is the publishing interval. The server may publish faster if it has changes; this is the upper bound between batches. For most industrial use cases, 250-500ms is right.
3. Modbus, the legacy half
Modbus TCP is brutally simple. You open a TCP connection, write a request with a function code and address, read the response. No subscriptions, no security, no nothing. You poll.
3.1 Modbus polling
// internal/source/modbus.go
package source
import (
"context"
"encoding/binary"
"log/slog"
"math"
"time"
"github.com/goburrow/modbus"
)
type ModbusConfig struct {
Address string // "10.0.0.5:502"
SlaveID byte
Timeout time.Duration
}
type ModbusTag struct {
Name string
Register uint16
Type string // "uint16", "int16", "float32", "int32"
Scale float64 // multiply raw by this
Unit string
}
type Modbus struct {
Cfg ModbusConfig
DeviceID string
Tags []ModbusTag
Interval time.Duration
Out chan<- Reading
}
func (m *Modbus) Run(ctx context.Context) error {
handler := modbus.NewTCPClientHandler(m.Cfg.Address)
handler.Timeout = m.Cfg.Timeout
handler.SlaveId = m.Cfg.SlaveID
if err := handler.Connect(); err != nil { return err }
defer handler.Close()
client := modbus.NewClient(handler)
slog.Info("modbus connected", "addr", m.Cfg.Address)
ticker := time.NewTicker(m.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case t := <-ticker.C:
for _, tag := range m.Tags {
v, err := m.read(client, tag)
if err != nil {
slog.Warn("modbus read", "tag", tag.Name, "err", err)
continue
}
r := Reading{
Source: "modbus",
DeviceID: m.DeviceID,
Tag: tag.Name,
Value: v * tag.Scale,
Quality: "good",
Timestamp: t,
Unit: tag.Unit,
}
select {
case m.Out <- r:
default:
}
}
}
}
}
func (m *Modbus) read(client modbus.Client, tag ModbusTag) (float64, error) {
switch tag.Type {
case "uint16":
b, err := client.ReadHoldingRegisters(tag.Register, 1)
if err != nil { return 0, err }
return float64(binary.BigEndian.Uint16(b)), nil
case "int16":
b, err := client.ReadHoldingRegisters(tag.Register, 1)
if err != nil { return 0, err }
return float64(int16(binary.BigEndian.Uint16(b))), nil
case "int32":
b, err := client.ReadHoldingRegisters(tag.Register, 2)
if err != nil { return 0, err }
return float64(int32(binary.BigEndian.Uint32(b))), nil
case "float32":
b, err := client.ReadHoldingRegisters(tag.Register, 2)
if err != nil { return 0, err }
u := binary.BigEndian.Uint32(b)
return float64(math.Float32frombits(u)), nil
}
return 0, nil
}
The Modbus library doesn’t give you a great context-aware interface; that’s the protocol’s age showing. Wrap calls in a select if you need cancellation.
3.2 Coalescing register reads
A real Modbus PLC has a request rate ceiling, often around 50 ops/sec. If you’ve got 30 tags polling every 500ms, you’re at 60 ops/sec and will start failing requests. The fix: coalesce contiguous registers into a single read.
// CoalesceReads groups tags by contiguous register ranges and returns
// a plan of (start, count) reads to issue.
type readBlock struct {
Start uint16
Count uint16
Tags []ModbusTag
}
func planReads(tags []ModbusTag) []readBlock {
if len(tags) == 0 { return nil }
// sort by Register
sorted := append([]ModbusTag(nil), tags...)
// (sort omitted for brevity)
var blocks []readBlock
cur := readBlock{Start: sorted[0].Register, Tags: []ModbusTag{sorted[0]}}
cur.Count = registerWidth(sorted[0].Type)
for i := 1; i < len(sorted); i++ {
t := sorted[i]
endOfCur := cur.Start + cur.Count
if t.Register == endOfCur && cur.Count+registerWidth(t.Type) <= 125 {
cur.Count += registerWidth(t.Type)
cur.Tags = append(cur.Tags, t)
} else {
blocks = append(blocks, cur)
cur = readBlock{Start: t.Register, Count: registerWidth(t.Type), Tags: []ModbusTag{t}}
}
}
blocks = append(blocks, cur)
return blocks
}
func registerWidth(t string) uint16 {
switch t {
case "uint16", "int16": return 1
case "int32", "float32", "uint32": return 2
case "float64", "int64": return 4
}
return 1
}
One read of 50 registers is dramatically faster than 50 reads of 1 register each, both due to TCP round-trip and PLC processing time.
4. The MQTT publisher
The publisher takes Reading values and ships them to MQTT 5 with user properties for traceability.
// internal/publish/mqtt.go
package publish
import (
"context"
"encoding/json"
"log/slog"
"net"
"time"
"github.com/eclipse/paho.golang/paho"
"github.com/example/bridge/internal/source"
)
type MQTT struct {
Broker string
ClientID string
SiteID string
In <-chan source.Reading
}
func (m *MQTT) Run(ctx context.Context) error {
conn, err := net.Dial("tcp", m.Broker)
if err != nil { return err }
c := paho.NewClient(paho.ClientConfig{Conn: conn})
_, err = c.Connect(ctx, &paho.Connect{
ClientID: m.ClientID,
KeepAlive: 30,
})
if err != nil { return err }
slog.Info("mqtt publisher connected", "broker", m.Broker)
for {
select {
case <-ctx.Done():
return nil
case r, ok := <-m.In:
if !ok { return nil }
payload, err := json.Marshal(r)
if err != nil { continue }
topic := "site/" + m.SiteID + "/device/" + r.DeviceID + "/tag/" + r.Tag
_, err = c.Publish(ctx, &paho.Publish{
Topic: topic,
QoS: 1,
Payload: payload,
Properties: &paho.PublishProperties{
ContentType: "application/json",
User: []paho.UserProperty{
{Key: "source", Value: r.Source},
{Key: "quality", Value: r.Quality},
},
MessageExpiry: ptr32(3600),
},
})
if err != nil {
slog.Warn("publish failed", "topic", topic, "err", err)
}
_ = time.Now() // keep imports happy
}
}
}
func ptr32(v uint32) *uint32 { return &v }
QoS 1 for most industrial telemetry. QoS 2 if you cannot tolerate a duplicate (rare). QoS 0 if you don’t care about delivery (fast-moving sensors that the next sample will fix anyway).
5. Wiring it together
// cmd/bridge/main.go
package main
import (
"context"
"log/slog"
"os/signal"
"syscall"
"time"
"github.com/example/bridge/internal/publish"
"github.com/example/bridge/internal/source"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
readings := make(chan source.Reading, 1024)
opcua := &source.OPCUA{
Cfg: source.OPCUAConfig{
Endpoint: "opc.tcp://10.0.0.10:4840",
SecurityMode: "SignAndEncrypt",
SecurityPol: "Basic256Sha256",
CertFile: "/etc/bridge/client.crt",
KeyFile: "/etc/bridge/client.key",
},
DeviceID: "plc-01",
Tags: []source.OPCUATag{
{NodeID: "ns=2;s=Temperature1", Name: "temperature_1", Unit: "C"},
{NodeID: "ns=2;s=Pressure1", Name: "pressure_1", Unit: "bar"},
},
Out: readings,
}
modbus := &source.Modbus{
Cfg: source.ModbusConfig{
Address: "10.0.0.20:502", SlaveID: 1, Timeout: 2 * time.Second,
},
DeviceID: "vfd-01",
Tags: []source.ModbusTag{
{Name: "rpm", Register: 100, Type: "uint16", Scale: 1.0, Unit: "rpm"},
{Name: "current", Register: 102, Type: "float32", Scale: 1.0, Unit: "A"},
},
Interval: 500 * time.Millisecond,
Out: readings,
}
mqtt := &publish.MQTT{
Broker: "broker.local:1883", ClientID: "bridge-1", SiteID: "siteA", In: readings,
}
go func() {
if err := opcua.Run(ctx); err != nil { slog.Error("opcua", "err", err) }
}()
go func() {
if err := modbus.Run(ctx); err != nil { slog.Error("modbus", "err", err) }
}()
if err := mqtt.Run(ctx); err != nil { slog.Error("mqtt", "err", err) }
}
That’s the bridge. About 600 lines including the parts not shown (config loading, metrics). Production-ready for a single gateway.
6. Common Pitfalls
Pitfall 1, polling Modbus too aggressively
A PLC’s CPU is doing real work running the control loop. If you hammer it with Modbus reads, you can slow the control loop down enough to cause problems. Talk to the controls engineer before setting your polling interval. Sub-100ms intervals are almost never appropriate.
Pitfall 2, ignoring OPC UA timestamps
Each OPC UA value comes with SourceTimestamp (when the value was sampled) and ServerTimestamp (when the server processed it). Always use SourceTimestamp. The ServerTimestamp includes queueing and is misleading for any latency analysis.
Pitfall 3, byte-order in Modbus
Modbus has no standard byte order. Different vendors use Big-Endian, Little-Endian, Big-Endian-Byte-Swap, you name it. The library defaults to Big-Endian, which is usually right but not always. Test with known values before going to production.
Pitfall 4, no certificate rotation plan
OPC UA self-signed certs typically have 1-year validity. If you don’t have a rotation plan, you’ll wake up one day to find the bridge can’t connect. Set up a calendar reminder, or use a certificate management tool that auto-renews.
7. Troubleshooting
OPC UA connect fails with “BadSecurityChecksFailed”
The server doesn’t trust your client cert. On most OPC UA servers, your first connection puts the cert in a “rejected” folder. The admin moves it to “trusted” and you try again. Some servers have a button labeled “trust all clients” for testing; do not leave it on.
Modbus reads return zeros for the first second after connect
Some PLCs need a moment to populate response buffers after a TCP connect. Add a time.Sleep(500*time.Millisecond) after handler.Connect(), or do a discard read to prime the buffer.
Bridge publishes burst on reconnect after broker outage
Backlog buildup during the outage. Cap the upstream channel and accept that you’ll drop data during broker downtime. The alternative (unbounded buffer) eats memory until you OOM. Drops you can monitor; OOMs you can’t.
8. Wrapping Up
A bridge from industrial protocols to MQTT is mostly about being patient with old gear. OPC UA gives you a sane subscription model and security. Modbus gives you nothing, so you build the rest yourself: polling, coalescing, byte-order handling. The Go ecosystem has good-enough libraries for both. Put a strict schema on the MQTT side and you’ve got a building block.
Next post in this series goes to the smallest devices, deploying models with TFLite Micro on microcontrollers. Once the bridge feeds the broker, the broker feeds the inference workers, you sometimes need inference where there’s no Linux at all.
Specs: OPC UA reference for the protocol, Modbus.org for the Modbus spec.