background-shape
Bridging OPC UA and Modbus to MQTT in Go, A Step by Step Guide
April 23, 2025 · 11 min read · by Muhammad Amal programming

TL;DR — Industrial gear speaks OPC UA or Modbus, not MQTT. The bridge in this post reads from both, normalizes the values, and publishes to MQTT 5 with proper user properties. OPC UA subscriptions for fresh data, Modbus polling for old PLCs, and a strict contract on the published topic shape. Go 1.24 with gopcua and goburrow/modbus does the job in about 600 lines.

If you’ve worked with industrial customers, you’ve met the protocols. OPC UA is the modern one, encrypted and structured, supported by basically every PLC vendor since 2015. Modbus TCP is the older one, plain TCP and bytes, supported by everything ever made. Neither is MQTT. To get sensor data into your modern stack (Kafka, time-series DB, dashboards), you need a bridge.

This post walks through building that bridge in Go 1.24. We’ll connect to an OPC UA server, set up subscriptions, poll a Modbus device, normalize both into a common schema, and publish to MQTT 5. The code targets a small edge gateway (Raspberry Pi or industrial PC), but runs anywhere.

If you read my earlier post on EMQX clustering, this is the producer side. The bridge talks to the brokers we built there.

1. The bridge architecture

Two source goroutines (one OPC UA, one Modbus), one normalization channel, one MQTT publisher goroutine. Standard shape.

+-----------+    +-----------+
| OPC UA    |    | Modbus    |
| client    |    | client    |
| (sub)     |    | (poll)    |
+-----+-----+    +-----+-----+
      |                |
      v                v
   +-----------------------+
   | normalize chan        |
   | (cap=1024)            |
   +----------+------------+
              |
              v
       +-------------+
       | MQTT pub    |
       | (paho v5)   |
       +-------------+
              |
              v
        emqx://broker

The normalize channel is the seam. Anything upstream can be added or replaced (BACnet, EtherNet/IP) without changing the publisher. The publisher knows nothing about industrial protocols.

1.1 The common schema

Every reading the bridge emits looks like this:

type Reading struct {
    Source     string    `json:"source"`      // "opcua" or "modbus"
    DeviceID   string    `json:"device_id"`   // PLC identifier
    Tag        string    `json:"tag"`         // logical name
    Value      float64   `json:"value"`
    Quality    string    `json:"quality"`     // "good", "bad", "uncertain"
    Timestamp  time.Time `json:"timestamp"`
    Unit       string    `json:"unit,omitempty"`
}

The topic shape is site/{site_id}/device/{device_id}/tag/{tag}. Always. Pin this contract early, downstream consumers will lock to it.

2. OPC UA, the modern half

github.com/gopcua/opcua is the maintained Go OPC UA client. v0.7.x (April 2025) supports the security profiles you need.

2.1 Connection with security

// internal/source/opcua.go
package source

import (
    "context"
    "log/slog"
    "time"

    "github.com/gopcua/opcua"
    "github.com/gopcua/opcua/ua"
)

type OPCUAConfig struct {
    Endpoint     string
    SecurityMode string // "None", "Sign", "SignAndEncrypt"
    SecurityPol  string // "None", "Basic256Sha256"
    CertFile     string
    KeyFile      string
    Username     string
    Password     string
}

type OPCUA struct {
    Cfg       OPCUAConfig
    DeviceID  string
    Tags      []OPCUATag
    Out       chan<- Reading
}

type OPCUATag struct {
    NodeID string
    Name   string
    Unit   string
}

func (o *OPCUA) Run(ctx context.Context) error {
    opts := []opcua.Option{
        opcua.SecurityMode(securityMode(o.Cfg.SecurityMode)),
        opcua.SecurityPolicy(o.Cfg.SecurityPol),
        opcua.SessionTimeout(60 * time.Second),
        opcua.AutoReconnect(true),
        opcua.ReconnectInterval(2 * time.Second),
    }
    if o.Cfg.CertFile != "" {
        opts = append(opts,
            opcua.CertificateFile(o.Cfg.CertFile),
            opcua.PrivateKeyFile(o.Cfg.KeyFile),
        )
    }
    if o.Cfg.Username != "" {
        opts = append(opts, opcua.AuthUsername(o.Cfg.Username, o.Cfg.Password))
    }

    c, err := opcua.NewClient(o.Cfg.Endpoint, opts...)
    if err != nil { return err }
    if err := c.Connect(ctx); err != nil { return err }
    defer c.Close(ctx)

    slog.Info("opcua connected", "endpoint", o.Cfg.Endpoint)
    return o.subscribe(ctx, c)
}

func securityMode(s string) ua.MessageSecurityMode {
    switch s {
    case "Sign": return ua.MessageSecurityModeSign
    case "SignAndEncrypt": return ua.MessageSecurityModeSignAndEncrypt
    default: return ua.MessageSecurityModeNone
    }
}

A note on certificates. OPC UA mutual TLS uses self-signed certs on both sides. The first time your client connects to a server, the server admin has to approve the client’s cert (and vice versa). This is a deployment step, not a code step.

2.2 Subscriptions

OPC UA subscriptions are way better than polling. The server publishes only when a value changes, with a configurable deadband.

func (o *OPCUA) subscribe(ctx context.Context, c *opcua.Client) error {
    notifyCh := make(chan *opcua.PublishNotificationData, 16)
    sub, err := c.SubscribeWithContext(ctx, &opcua.SubscriptionParameters{
        Interval: 250 * time.Millisecond,
    }, notifyCh)
    if err != nil { return err }
    defer sub.Cancel(ctx)

    // Add monitored items
    nodes := make([]*ua.MonitoredItemCreateRequest, 0, len(o.Tags))
    tagByHandle := make(map[uint32]OPCUATag, len(o.Tags))
    for i, t := range o.Tags {
        nid, err := ua.ParseNodeID(t.NodeID)
        if err != nil { return err }
        handle := uint32(i + 1)
        req := opcua.NewMonitoredItemCreateRequestWithDefaults(
            nid, ua.AttributeIDValue, handle,
        )
        nodes = append(nodes, req)
        tagByHandle[handle] = t
    }
    if _, err := sub.Monitor(ctx, ua.TimestampsToReturnBoth, nodes...); err != nil {
        return err
    }

    for {
        select {
        case <-ctx.Done():
            return nil
        case n := <-notifyCh:
            if n.Error != nil {
                slog.Warn("opcua notification", "err", n.Error)
                continue
            }
            switch v := n.Value.(type) {
            case *ua.DataChangeNotification:
                for _, item := range v.MonitoredItems {
                    tag, ok := tagByHandle[item.ClientHandle]
                    if !ok { continue }
                    o.emit(item, tag)
                }
            }
        }
    }
}

func (o *OPCUA) emit(item *ua.MonitoredItemNotification, tag OPCUATag) {
    if item.Value == nil { return }
    val, ok := item.Value.Value.Value().(float64)
    if !ok {
        // try other numeric types
        if iv, ok2 := item.Value.Value.Value().(int32); ok2 {
            val = float64(iv)
        } else {
            return
        }
    }
    quality := "good"
    if item.Value.Status != ua.StatusOK {
        quality = "bad"
    }
    r := Reading{
        Source:    "opcua",
        DeviceID:  o.DeviceID,
        Tag:       tag.Name,
        Value:     val,
        Quality:   quality,
        Timestamp: item.Value.SourceTimestamp,
        Unit:      tag.Unit,
    }
    select {
    case o.Out <- r:
    default:
        // backpressure: drop
    }
}

Interval: 250ms is the publishing interval. The server may publish faster if it has changes; this is the upper bound between batches. For most industrial use cases, 250-500ms is right.

3. Modbus, the legacy half

Modbus TCP is brutally simple. You open a TCP connection, write a request with a function code and address, read the response. No subscriptions, no security, no nothing. You poll.

3.1 Modbus polling

// internal/source/modbus.go
package source

import (
    "context"
    "encoding/binary"
    "log/slog"
    "math"
    "time"

    "github.com/goburrow/modbus"
)

type ModbusConfig struct {
    Address  string // "10.0.0.5:502"
    SlaveID  byte
    Timeout  time.Duration
}

type ModbusTag struct {
    Name       string
    Register   uint16
    Type       string  // "uint16", "int16", "float32", "int32"
    Scale      float64 // multiply raw by this
    Unit       string
}

type Modbus struct {
    Cfg      ModbusConfig
    DeviceID string
    Tags     []ModbusTag
    Interval time.Duration
    Out      chan<- Reading
}

func (m *Modbus) Run(ctx context.Context) error {
    handler := modbus.NewTCPClientHandler(m.Cfg.Address)
    handler.Timeout = m.Cfg.Timeout
    handler.SlaveId = m.Cfg.SlaveID
    if err := handler.Connect(); err != nil { return err }
    defer handler.Close()

    client := modbus.NewClient(handler)
    slog.Info("modbus connected", "addr", m.Cfg.Address)

    ticker := time.NewTicker(m.Interval)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return nil
        case t := <-ticker.C:
            for _, tag := range m.Tags {
                v, err := m.read(client, tag)
                if err != nil {
                    slog.Warn("modbus read", "tag", tag.Name, "err", err)
                    continue
                }
                r := Reading{
                    Source:    "modbus",
                    DeviceID:  m.DeviceID,
                    Tag:       tag.Name,
                    Value:     v * tag.Scale,
                    Quality:   "good",
                    Timestamp: t,
                    Unit:      tag.Unit,
                }
                select {
                case m.Out <- r:
                default:
                }
            }
        }
    }
}

func (m *Modbus) read(client modbus.Client, tag ModbusTag) (float64, error) {
    switch tag.Type {
    case "uint16":
        b, err := client.ReadHoldingRegisters(tag.Register, 1)
        if err != nil { return 0, err }
        return float64(binary.BigEndian.Uint16(b)), nil
    case "int16":
        b, err := client.ReadHoldingRegisters(tag.Register, 1)
        if err != nil { return 0, err }
        return float64(int16(binary.BigEndian.Uint16(b))), nil
    case "int32":
        b, err := client.ReadHoldingRegisters(tag.Register, 2)
        if err != nil { return 0, err }
        return float64(int32(binary.BigEndian.Uint32(b))), nil
    case "float32":
        b, err := client.ReadHoldingRegisters(tag.Register, 2)
        if err != nil { return 0, err }
        u := binary.BigEndian.Uint32(b)
        return float64(math.Float32frombits(u)), nil
    }
    return 0, nil
}

The Modbus library doesn’t give you a great context-aware interface; that’s the protocol’s age showing. Wrap calls in a select if you need cancellation.

3.2 Coalescing register reads

A real Modbus PLC has a request rate ceiling, often around 50 ops/sec. If you’ve got 30 tags polling every 500ms, you’re at 60 ops/sec and will start failing requests. The fix: coalesce contiguous registers into a single read.

// CoalesceReads groups tags by contiguous register ranges and returns
// a plan of (start, count) reads to issue.
type readBlock struct {
    Start uint16
    Count uint16
    Tags  []ModbusTag
}

func planReads(tags []ModbusTag) []readBlock {
    if len(tags) == 0 { return nil }
    // sort by Register
    sorted := append([]ModbusTag(nil), tags...)
    // (sort omitted for brevity)
    var blocks []readBlock
    cur := readBlock{Start: sorted[0].Register, Tags: []ModbusTag{sorted[0]}}
    cur.Count = registerWidth(sorted[0].Type)
    for i := 1; i < len(sorted); i++ {
        t := sorted[i]
        endOfCur := cur.Start + cur.Count
        if t.Register == endOfCur && cur.Count+registerWidth(t.Type) <= 125 {
            cur.Count += registerWidth(t.Type)
            cur.Tags = append(cur.Tags, t)
        } else {
            blocks = append(blocks, cur)
            cur = readBlock{Start: t.Register, Count: registerWidth(t.Type), Tags: []ModbusTag{t}}
        }
    }
    blocks = append(blocks, cur)
    return blocks
}

func registerWidth(t string) uint16 {
    switch t {
    case "uint16", "int16": return 1
    case "int32", "float32", "uint32": return 2
    case "float64", "int64": return 4
    }
    return 1
}

One read of 50 registers is dramatically faster than 50 reads of 1 register each, both due to TCP round-trip and PLC processing time.

4. The MQTT publisher

The publisher takes Reading values and ships them to MQTT 5 with user properties for traceability.

// internal/publish/mqtt.go
package publish

import (
    "context"
    "encoding/json"
    "log/slog"
    "net"
    "time"

    "github.com/eclipse/paho.golang/paho"
    "github.com/example/bridge/internal/source"
)

type MQTT struct {
    Broker   string
    ClientID string
    SiteID   string
    In       <-chan source.Reading
}

func (m *MQTT) Run(ctx context.Context) error {
    conn, err := net.Dial("tcp", m.Broker)
    if err != nil { return err }
    c := paho.NewClient(paho.ClientConfig{Conn: conn})
    _, err = c.Connect(ctx, &paho.Connect{
        ClientID:  m.ClientID,
        KeepAlive: 30,
    })
    if err != nil { return err }
    slog.Info("mqtt publisher connected", "broker", m.Broker)

    for {
        select {
        case <-ctx.Done():
            return nil
        case r, ok := <-m.In:
            if !ok { return nil }
            payload, err := json.Marshal(r)
            if err != nil { continue }
            topic := "site/" + m.SiteID + "/device/" + r.DeviceID + "/tag/" + r.Tag
            _, err = c.Publish(ctx, &paho.Publish{
                Topic:   topic,
                QoS:     1,
                Payload: payload,
                Properties: &paho.PublishProperties{
                    ContentType: "application/json",
                    User: []paho.UserProperty{
                        {Key: "source", Value: r.Source},
                        {Key: "quality", Value: r.Quality},
                    },
                    MessageExpiry: ptr32(3600),
                },
            })
            if err != nil {
                slog.Warn("publish failed", "topic", topic, "err", err)
            }
            _ = time.Now() // keep imports happy
        }
    }
}

func ptr32(v uint32) *uint32 { return &v }

QoS 1 for most industrial telemetry. QoS 2 if you cannot tolerate a duplicate (rare). QoS 0 if you don’t care about delivery (fast-moving sensors that the next sample will fix anyway).

5. Wiring it together

// cmd/bridge/main.go
package main

import (
    "context"
    "log/slog"
    "os/signal"
    "syscall"
    "time"

    "github.com/example/bridge/internal/publish"
    "github.com/example/bridge/internal/source"
)

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    readings := make(chan source.Reading, 1024)

    opcua := &source.OPCUA{
        Cfg: source.OPCUAConfig{
            Endpoint:     "opc.tcp://10.0.0.10:4840",
            SecurityMode: "SignAndEncrypt",
            SecurityPol:  "Basic256Sha256",
            CertFile:     "/etc/bridge/client.crt",
            KeyFile:      "/etc/bridge/client.key",
        },
        DeviceID: "plc-01",
        Tags: []source.OPCUATag{
            {NodeID: "ns=2;s=Temperature1", Name: "temperature_1", Unit: "C"},
            {NodeID: "ns=2;s=Pressure1",    Name: "pressure_1",    Unit: "bar"},
        },
        Out: readings,
    }

    modbus := &source.Modbus{
        Cfg: source.ModbusConfig{
            Address: "10.0.0.20:502", SlaveID: 1, Timeout: 2 * time.Second,
        },
        DeviceID: "vfd-01",
        Tags: []source.ModbusTag{
            {Name: "rpm", Register: 100, Type: "uint16", Scale: 1.0, Unit: "rpm"},
            {Name: "current", Register: 102, Type: "float32", Scale: 1.0, Unit: "A"},
        },
        Interval: 500 * time.Millisecond,
        Out:      readings,
    }

    mqtt := &publish.MQTT{
        Broker: "broker.local:1883", ClientID: "bridge-1", SiteID: "siteA", In: readings,
    }

    go func() {
        if err := opcua.Run(ctx); err != nil { slog.Error("opcua", "err", err) }
    }()
    go func() {
        if err := modbus.Run(ctx); err != nil { slog.Error("modbus", "err", err) }
    }()
    if err := mqtt.Run(ctx); err != nil { slog.Error("mqtt", "err", err) }
}

That’s the bridge. About 600 lines including the parts not shown (config loading, metrics). Production-ready for a single gateway.

6. Common Pitfalls

Pitfall 1, polling Modbus too aggressively

A PLC’s CPU is doing real work running the control loop. If you hammer it with Modbus reads, you can slow the control loop down enough to cause problems. Talk to the controls engineer before setting your polling interval. Sub-100ms intervals are almost never appropriate.

Pitfall 2, ignoring OPC UA timestamps

Each OPC UA value comes with SourceTimestamp (when the value was sampled) and ServerTimestamp (when the server processed it). Always use SourceTimestamp. The ServerTimestamp includes queueing and is misleading for any latency analysis.

Pitfall 3, byte-order in Modbus

Modbus has no standard byte order. Different vendors use Big-Endian, Little-Endian, Big-Endian-Byte-Swap, you name it. The library defaults to Big-Endian, which is usually right but not always. Test with known values before going to production.

Pitfall 4, no certificate rotation plan

OPC UA self-signed certs typically have 1-year validity. If you don’t have a rotation plan, you’ll wake up one day to find the bridge can’t connect. Set up a calendar reminder, or use a certificate management tool that auto-renews.

7. Troubleshooting

OPC UA connect fails with “BadSecurityChecksFailed”

The server doesn’t trust your client cert. On most OPC UA servers, your first connection puts the cert in a “rejected” folder. The admin moves it to “trusted” and you try again. Some servers have a button labeled “trust all clients” for testing; do not leave it on.

Modbus reads return zeros for the first second after connect

Some PLCs need a moment to populate response buffers after a TCP connect. Add a time.Sleep(500*time.Millisecond) after handler.Connect(), or do a discard read to prime the buffer.

Bridge publishes burst on reconnect after broker outage

Backlog buildup during the outage. Cap the upstream channel and accept that you’ll drop data during broker downtime. The alternative (unbounded buffer) eats memory until you OOM. Drops you can monitor; OOMs you can’t.

8. Wrapping Up

A bridge from industrial protocols to MQTT is mostly about being patient with old gear. OPC UA gives you a sane subscription model and security. Modbus gives you nothing, so you build the rest yourself: polling, coalescing, byte-order handling. The Go ecosystem has good-enough libraries for both. Put a strict schema on the MQTT side and you’ve got a building block.

Next post in this series goes to the smallest devices, deploying models with TFLite Micro on microcontrollers. Once the bridge feeds the broker, the broker feeds the inference workers, you sometimes need inference where there’s no Linux at all.

Specs: OPC UA reference for the protocol, Modbus.org for the Modbus spec.