background-shape
Bridging OPC UA to MQTT 5, Where OT Meets IT Without Pain
August 26, 2023 · 6 min read · by Muhammad Amal programming

TL;DR — OPC UA is the OT-side protocol you can’t avoid; MQTT is the IT-side protocol everything else speaks / Build a thin asyncio bridge that subscribes to OPC UA monitored items and publishes structured payloads to MQTT / Security mode and certificate trust between PLC and bridge is where most deployments stall.

The OT/IT integration story is the one where most projects either over-engineer or skip a step that ruins them later. PLCs and SCADA systems speak OPC UA. The cloud, the analytics team, the broker, the dashboard — they all speak MQTT. The bridge between them is unglamorous code that, when done well, you never think about again.

I’ve shipped a few of these in 2023 and the shape that works is small: a single-purpose process per gateway that subscribes to an OPC UA server, transforms node updates into a stable JSON payload, and publishes to MQTT with predictable topic structure. This sits on the same edge gateway pattern from my K3s edge post.

OPC UA in One Paragraph for Backend Engineers

OPC UA is a request/response and subscription-based protocol with a strongly typed address space. Servers expose nodes (sensors, variables, methods) under a hierarchical namespace. Clients can browse the address space, read/write values directly, or — much more commonly for telemetry — create a Subscription and add MonitoredItems for nodes whose value changes they want pushed to them. Security has three orthogonal axes: message security mode (none / sign / sign-and-encrypt), security policy (the crypto suite), and user authentication (anonymous, username, certificate). All three need to be agreed between client and server, and certificate trust must be established in both directions.

The Bridge: Python, asyncua, paho-mqtt

I run this in Python with asyncua because OPC UA’s protocol is verbose enough that I don’t want to hand-roll it, and asyncua is the maintained pure-Python client in 2023. For a single-PLC bridge the throughput is comfortable even on a small edge gateway.

# bridge.py — Python 3.11, asyncua 1.0.4, paho-mqtt 1.6.1
import asyncio
import json
import logging
import os
import signal
from datetime import datetime, timezone
from asyncua import Client, ua
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
import paho.mqtt.client as mqtt

log = logging.getLogger("opcua-mqtt-bridge")

OPCUA_URL    = os.environ["OPCUA_URL"]            # opc.tcp://plc.local:4840
MQTT_HOST    = os.environ["MQTT_HOST"]
MQTT_PORT    = int(os.environ.get("MQTT_PORT", "1883"))
SITE_ID      = os.environ["SITE_ID"]
NODE_MAP     = json.loads(os.environ["NODE_MAP"]) # { "ns=2;i=1001": "press/inlet_temp", ... }

class MqttPublisher:
    def __init__(self, host: str, port: int):
        self.client = mqtt.Client(
            client_id=f"opcua-bridge-{SITE_ID}",
            protocol=mqtt.MQTTv5,
        )
        self.client.connect_async(host, port, keepalive=30)
        self.client.loop_start()

    def publish(self, topic: str, payload: dict, qos: int = 1):
        self.client.publish(topic, json.dumps(payload, separators=(",", ":")), qos=qos)

class SubscriptionHandler:
    def __init__(self, publisher: MqttPublisher, node_map: dict[str, str]):
        self.publisher = publisher
        self.node_map = node_map

    def datachange_notification(self, node, val, data):
        node_id = node.nodeid.to_string()
        topic_suffix = self.node_map.get(node_id)
        if not topic_suffix:
            return

        ts = data.monitored_item.Value.SourceTimestamp or datetime.now(timezone.utc)
        sv = data.monitored_item.Value.StatusCode
        quality = 192 if sv.is_good() else (64 if sv.is_uncertain() else 0)

        payload = {
            "device_id": SITE_ID,
            "metric": topic_suffix.split("/")[-1],
            "value": val if isinstance(val, (int, float)) else float(val),
            "ts": int(ts.timestamp() * 1000),
            "quality": quality,
        }
        topic = f"plant/{SITE_ID}/data/{topic_suffix}"
        self.publisher.publish(topic, payload, qos=1)

async def run():
    publisher = MqttPublisher(MQTT_HOST, MQTT_PORT)

    async with Client(url=OPCUA_URL) as client:
        await client.set_security(
            SecurityPolicyBasic256Sha256,
            certificate="/etc/opcua/client-cert.pem",
            private_key="/etc/opcua/client-key.pem",
            server_certificate="/etc/opcua/server-cert.pem",
            mode=ua.MessageSecurityMode.SignAndEncrypt,
        )
        log.info("connected to %s", OPCUA_URL)

        handler = SubscriptionHandler(publisher, NODE_MAP)
        sub = await client.create_subscription(period=500, handler=handler)

        nodes = [client.get_node(nid) for nid in NODE_MAP.keys()]
        await sub.subscribe_data_change(
            nodes,
            queuesize=10,
            sampling_interval=500,
        )
        log.info("subscribed to %d nodes", len(nodes))

        stop = asyncio.Event()
        loop = asyncio.get_running_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, stop.set)
        await stop.wait()

        await sub.delete()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
    asyncio.run(run())

Worth noting:

  • sampling_interval=500. The PLC samples the underlying variable every 500ms. Lower this and you’ll see the PLC CPU climb — don’t sample faster than the downstream consumers actually use.
  • queuesize=10. Buffer up to 10 unsent samples per monitored item if the network blips. Beyond that, OPC UA’s behavior depends on the server’s discard_oldest policy.
  • Quality mapping. OPC UA StatusCode to the quality byte conventions (192 good, 64 uncertain, 0 bad). Downstream consumers should filter on this.

Node Mapping: The Boring File That Saves You

The NODE_MAP is the contract between the OT engineers (who know which ns=2;i=1001 is the inlet temperature) and the IT engineers (who only care that they get plant/east-01/data/press/inlet_temp). Keep this in version control, alongside whatever PLC program revision it matches:

{
  "ns=2;s=PressLine.Inlet.Temperature":   "press/inlet/temperature",
  "ns=2;s=PressLine.Inlet.Pressure":      "press/inlet/pressure",
  "ns=2;s=PressLine.Stroke.Position":     "press/stroke/position",
  "ns=2;s=PressLine.Cycle.CounterTotal":  "press/cycle/count",
  "ns=3;s=Machine.HMI.OperatorActive":    "machine/hmi/operator_active"
}

Use string identifiers (ns=2;s=...) over numeric ones (ns=2;i=1001) where possible. Numeric IDs get reshuffled when the PLC program is updated; string IDs are stable as long as the variable name doesn’t change.

Security Mode Without Hand-Waving

MessageSecurityMode.SignAndEncrypt with Basic256Sha256 is the floor for industrial deployments. Two things have to happen for it to work:

  1. The bridge’s client certificate must be trusted by the OPC UA server. Either the server administrator imports the cert into the server’s trust list, or the server is configured to accept the cert on first connection (which you should disable in production).
  2. The server’s certificate must be trusted by the bridge. Pin it to a file on the bridge, don’t accept “any cert.”

I’ve watched too many deployments run MessageSecurityMode.None because the cert exchange was tedious. The protocol allows it; your security review shouldn’t. Plan for cert rotation at the design phase, not after.

Sampling vs Publishing Rate

OPC UA has two timing dials: how often the server samples the underlying variable (sampling_interval), and how often the server pushes batches of changes to the client (subscription period). Confusing these is a classic mistake.

  • sampling_interval: how fast the PLC reads its own variable.
  • period: how often the server sends the changes it’s accumulated.
  • queuesize: how many samples per monitored item are buffered between publishes.

A vibration sensor that genuinely needs 1ms resolution probably shouldn’t be on OPC UA at all — it should be on a direct industrial protocol with appropriate real-time guarantees. For typical sensor telemetry (temperature, pressure, position, cycle counts), 100–500ms sampling with a 1s subscription period is reasonable.

Common Pitfalls

  • Reconnect storms after a network blip. asyncua’s client will reconnect, but you need to recreate subscriptions and re-subscribe monitored items. Wrap the subscription setup in a retry loop with backoff, and on reconnect republish a “session_start” event over MQTT so consumers can invalidate any stale state they cached.
  • PLC reboot drops your subscription silently. Some OPC UA servers don’t re-register subscriptions after a restart. Health-check by reading a server time node every minute; if reads fail, tear down and rebuild from scratch.
  • Timestamp source confusion. OPC UA gives you SourceTimestamp (when the variable changed at the source) and ServerTimestamp (when the OPC UA server observed it). Always prefer SourceTimestamp. If it’s None, fall back to ServerTimestamp. If both are None, fall back to the bridge clock and log a warning.
  • Treating the bridge as stateless. If the bridge restarts and the MQTT broker has clean_start=true, you’ve lost any in-flight messages. Use clean_start=false with a stable client_id so the broker holds messages while the bridge reconnects to the PLC.
  • One bridge for many PLCs. A single asyncio process per PLC is the right granularity. A single bridge managing 20 PLCs is one bug away from a fleet-wide outage. The OPC UA Foundation’s protocol overview is worth a re-read if you’re tempted to consolidate.

Wrapping Up

OPC UA to MQTT is unglamorous plumbing that’s worth getting right because everything downstream depends on it. Small, single-purpose, version-controlled bridges with strict node maps beat any “universal gateway” product I’ve evaluated. Final post in this series I’ll get into the backpressure and reliability patterns that hold the whole pipeline together when something goes wrong.