Architecting Computer Vision Quality Control at the Industrial Edge
TL;DR — Run inspection inference on the line, not in the cloud / Budget every millisecond from shutter to PLC signal / Treat the model as one replaceable component in a hard-real-time system
The first time I shipped a vision inspection rig to a real factory, the demo that worked flawlessly on my desk fell apart in two hours. The problem was not the model. It was a network hiccup that stalled a cloud round-trip long enough for three defective parts to sail past the reject gate. That afternoon taught me something every cloud-first engineer eventually learns: a quality control loop has a deadline, and a deadline you cannot guarantee is not a deadline at all.
Computer vision quality control at the industrial edge is a different discipline from building a clever classifier. The accuracy of the model matters, but it is table stakes. What separates a system that survives a year on the floor from one that gets ripped out in a month is architecture: deterministic latency, graceful degradation, and a clean contact with the physical machinery that actually moves parts.
This article is the system blueprint I wish I had on day one. We will design the full pipeline on an NVIDIA Jetson Orin, with YOLOv11 served through TensorRT 10, and trace a single part from the moment it triggers the camera to the moment a PLC either passes it or kicks it into a reject bin.
The Latency Budget Comes First
Before writing a line of inference code, write down the budget. On a conveyor moving 0.5 m/s with a reject actuator 400 mm downstream of the camera, you have 800 ms from capture to actuation. That sounds generous until you account for everything that has to happen.
Trigger debounce : 5 ms
Sensor-to-shutter delay : 12 ms
Frame exposure : 8 ms
DMA transfer to GPU : 6 ms
Preprocess (resize/norm): 9 ms
TensorRT inference : 22 ms
Postprocess + NMS : 4 ms
Decision logic : 1 ms
Fieldbus write to PLC : 15 ms
-------------------------------
Total committed : 82 ms
Slack : 718 ms
The slack is your safety margin, and you spend it on jitter, not average case. A pipeline that runs in 82 ms on average but spikes to 600 ms once a minute will still drop parts. The architectural goal is a tight latency distribution, not a low mean. Everything below is in service of that.
Hardware and Capture Layer
Pick an Orin module with enough headroom that you never run the GPU at 100 percent. I default to the Orin NX 16GB for single-camera lines and the AGX Orin for multi-camera cells. Lock the power mode and clocks so the scheduler stops surprising you.
# Pin maximum, deterministic clocks on Jetson Orin
sudo nvpmodel -m 0 # MAXN power profile
sudo jetson_clocks # lock GPU/CPU/EMC clocks to max
sudo jetson_clocks --show # verify no thermal throttling headroom issues
For capture, use a global-shutter industrial camera over GigE Vision or MIPI CSI-2. Rolling shutter smears fast-moving parts and quietly destroys your defect signal. The camera should be hardware-triggered by a photoelectric sensor, not free-running, so each frame is deterministically tied to a part.
# capture.py — hardware-triggered acquisition with Aravis (GigE Vision)
import gi
gi.require_version("Aravis", "0.8")
from gi.repository import Aravis
import numpy as np
class TriggeredCamera:
def __init__(self, device_id: str):
self.cam = Aravis.Camera.new(device_id)
self.cam.set_pixel_format(Aravis.PIXEL_FORMAT_BAYER_RG_8)
self.cam.set_region(0, 0, 1280, 1024)
self.cam.set_exposure_time(8000) # 8 ms
self.cam.set_frame_rate(0) # disable free-run
self.cam.set_trigger("Line1") # external hardware trigger
self.stream = self.cam.create_stream(None, None)
payload = self.cam.get_payload()
for _ in range(8): # pre-allocate buffer pool
self.stream.push_buffer(Aravis.Buffer.new_allocate(payload))
self.cam.start_acquisition()
def grab(self, timeout_us: int = 200_000) -> np.ndarray | None:
buf = self.stream.timeout_pop_buffer(timeout_us)
if buf is None or buf.get_status() != Aravis.BufferStatus.SUCCESS:
if buf is not None:
self.stream.push_buffer(buf)
return None
w, h = buf.get_image_width(), buf.get_image_height()
raw = np.frombuffer(buf.get_data(), dtype=np.uint8).reshape(h, w).copy()
self.stream.push_buffer(buf) # return buffer to pool
return raw
The buffer pool matters. Allocating frame memory inside the hot loop is one of the most common sources of latency spikes I have profiled. Pre-allocate, reuse, never malloc on the critical path.
Building the TensorRT Engine
YOLOv11 in raw PyTorch is fine for training, but it has no business running on the line. Export it to ONNX, then build a TensorRT 10 engine tuned for your exact input shape and precision. The full pipeline for training the model that feeds this step is covered separately, but here we assume a trained .pt checkpoint.
# 1. Export YOLOv11 to ONNX with a fixed batch size
yolo export model=defect_yolo11s.pt format=onnx \
opset=17 imgsz=640 batch=1 simplify=True
# 2. Build a TensorRT 10 engine with INT8 calibration
trtexec --onnx=defect_yolo11s.onnx \
--saveEngine=defect_yolo11s.int8.engine \
--int8 --fp16 \
--calib=calib_cache.bin \
--shapes=images:1x3x640x640 \
--builderOptimizationLevel=4 \
--useCudaGraph
INT8 cuts inference time roughly in half on Orin versus FP16, but it needs a calibration cache built from a few hundred representative frames. Do not skip this. An INT8 engine calibrated on internet images and deployed on factory lighting will quietly lose 8-10 points of mAP. Calibrate on frames captured from the actual rig, under the actual lights.
# build_engine.py — INT8 calibration with line-captured frames
import tensorrt as trt
import numpy as np, glob, cv2
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
class FactoryCalibrator(trt.IInt8EntropyCalibrator2):
def __init__(self, frame_dir: str, cache_path: str):
super().__init__()
self.cache_path = cache_path
self.files = sorted(glob.glob(f"{frame_dir}/*.png"))
self.idx = 0
self.batch = np.zeros((1, 3, 640, 640), dtype=np.float32)
import pycuda.driver as cuda
self.cuda = cuda
self.d_input = cuda.mem_alloc(self.batch.nbytes)
def get_batch_size(self): return 1
def get_batch(self, names):
if self.idx >= len(self.files):
return None
img = cv2.imread(self.files[self.idx])
img = cv2.resize(img, (640, 640)).astype(np.float32) / 255.0
self.batch[0] = img.transpose(2, 0, 1)
self.cuda.memcpy_htod(self.d_input, np.ascontiguousarray(self.batch))
self.idx += 1
return [int(self.d_input)]
def read_calibration_cache(self):
try:
with open(self.cache_path, "rb") as f:
return f.read()
except FileNotFoundError:
return None
def write_calibration_cache(self, cache):
with open(self.cache_path, "wb") as f:
f.write(cache)
The Inference Service
The runtime service has one job: turn frames into decisions with predictable latency. Use a CUDA graph to eliminate per-launch kernel overhead, and keep host and device buffers pinned and pre-allocated.
# inference.py — TensorRT 10 runtime with CUDA graph capture
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
class DefectDetector:
def __init__(self, engine_path: str, conf_thresh: float = 0.45):
self.conf = conf_thresh
logger = trt.Logger(trt.Logger.ERROR)
with open(engine_path, "rb") as f, trt.Runtime(logger) as rt:
self.engine = rt.deserialize_cuda_engine(f.read())
self.ctx = self.engine.create_execution_context()
self.stream = cuda.Stream()
# Allocate pinned host + device memory once
self.in_shape = (1, 3, 640, 640)
self.out_shape = tuple(self.ctx.get_tensor_shape("output0"))
self.h_in = cuda.pagelocked_empty(self.in_shape, np.float32)
self.h_out = cuda.pagelocked_empty(self.out_shape, np.float32)
self.d_in = cuda.mem_alloc(self.h_in.nbytes)
self.d_out = cuda.mem_alloc(self.h_out.nbytes)
self.ctx.set_tensor_address("images", int(self.d_in))
self.ctx.set_tensor_address("output0", int(self.d_out))
self._capture_graph()
def _capture_graph(self):
# Warm up, then capture the launch sequence as a CUDA graph
for _ in range(3):
self.ctx.execute_async_v3(self.stream.handle)
self.stream.synchronize()
self.graph = cuda.Graph()
self.stream.begin_capture()
cuda.memcpy_htod_async(self.d_in, self.h_in, self.stream)
self.ctx.execute_async_v3(self.stream.handle)
cuda.memcpy_dtoh_async(self.h_out, self.d_out, self.stream)
self.graph_exec = self.stream.end_capture().instantiate()
def infer(self, chw_frame: np.ndarray) -> np.ndarray:
np.copyto(self.h_in, chw_frame)
self.graph_exec.launch(self.stream)
self.stream.synchronize()
return self._postprocess(self.h_out.copy())
def _postprocess(self, raw: np.ndarray) -> np.ndarray:
# raw: (1, 4+num_classes, num_anchors) for YOLOv11
pred = raw[0].T
scores = pred[:, 4:].max(axis=1)
keep = scores > self.conf
boxes = pred[keep, :4]
cls = pred[keep, 4:].argmax(axis=1)
return np.column_stack([boxes, scores[keep], cls]) if keep.any() \
else np.empty((0, 6), dtype=np.float32)
Closing the Loop to the PLC
A vision system that prints “DEFECT” to a console is a science project. The decision has to reach the machine. Most factory floors speak Modbus TCP, EtherNet/IP, or PROFINET. Modbus is the lowest common denominator and the easiest to get right.
# actuator.py — write pass/reject decision to PLC over Modbus TCP
from pymodbus.client import ModbusTcpClient
import time, logging
log = logging.getLogger("actuator")
class RejectGate:
def __init__(self, plc_host: str, coil_addr: int = 16):
self.client = ModbusTcpClient(plc_host, port=502, timeout=0.05)
self.coil = coil_addr
if not self.client.connect():
raise ConnectionError(f"PLC unreachable at {plc_host}")
def signal(self, is_defect: bool, part_id: int) -> bool:
try:
rr = self.client.write_coil(self.coil, is_defect)
if rr.isError():
log.error("Modbus write failed for part %d", part_id)
return False
return True
except Exception as exc:
log.error("PLC write exception part %d: %s", part_id, exc)
return False
Note the 50 ms timeout. If the PLC does not acknowledge in time, you do not get to retry forever — the part has moved. The safe default for a missed acknowledgement depends on your line: a pharmaceutical line fails closed (reject on uncertainty), a low-cost-part line may fail open. Make that policy explicit in code, never implicit.
# pipeline.py — the orchestrated hot loop
def run(cam, detector, gate, conf=0.45):
part_id = 0
while True:
raw = cam.grab()
if raw is None:
continue # no trigger, idle
part_id += 1
t0 = time.perf_counter_ns()
rgb = cv2.cvtColor(raw, cv2.COLOR_BAYER_RG2RGB)
chw = (cv2.resize(rgb, (640, 640)).astype("float32") / 255.0
).transpose(2, 0, 1)[None]
dets = detector.infer(chw)
is_defect = len(dets) > 0
ok = gate.signal(is_defect, part_id)
latency_ms = (time.perf_counter_ns() - t0) / 1e6
if not ok: # fail-closed policy
gate.signal(True, part_id)
if latency_ms > 120:
log.warning("part %d latency %.1f ms over budget",
part_id, latency_ms)
Common Pitfalls
- Free-running cameras. A camera that streams at a fixed FPS will hand you frames that do not line up with parts. Hardware-trigger every acquisition.
- Calibrating INT8 on the wrong data. Factory lighting is not COCO. Calibrate on rig-captured frames or accept silent accuracy loss.
- Allocating on the hot path. Every
np.zeros, every buffer allocation inside the loop is a latency spike waiting for the worst possible moment. - Ignoring thermal throttling. Orin will quietly downclock under sustained load in a hot enclosure. Run
jetson_clocksand monitortegrastats. - Treating mean latency as the SLA. The line cares about your p99.9. A rare 500 ms spike still drops parts.
Troubleshooting
- Symptom: Inference latency creeps up over hours. Cause: Thermal throttling in the enclosure. Fix: Add active cooling, monitor
tegrastats, and alert on clock drops below the pinned frequency. - Symptom: mAP drops sharply after deploying the INT8 engine. Cause: Calibration cache built from non-representative images. Fix: Rebuild the cache from a few hundred frames captured on the actual rig under production lighting.
- Symptom: Random missed parts with no logged error. Cause: Camera buffer pool exhausted, frames silently dropped. Fix: Increase the pre-allocated buffer count and return buffers to the pool immediately after copy.
- Symptom: PLC occasionally ignores reject commands. Cause: Modbus write timing out under network load. Fix: Put the vision system and PLC on an isolated VLAN, shorten cabling, and verify the fail-closed fallback fires.
- Symptom: First few frames after startup are very slow. Cause: TensorRT context and CUDA graph not yet warmed. Fix: Run several warm-up inferences before signalling the line that the system is ready.
Wrapping Up
A vision quality control system lives or dies on its latency distribution and its honest contact with the machinery, not on a leaderboard mAP score. Get the latency budget, the triggered capture, the TensorRT engine, and the PLC handshake right, and the model becomes a swappable part. Next we will go one level deeper into the component everyone wants to talk about first: training the defect detection model that this pipeline serves.