Streaming Inference Pipelines with Kafka and Go, A Production Walkthrough
TL;DR — A streaming inference pipeline is just a Kafka consumer that calls a model. The catch is doing it without losing data, without underutilizing the GPU, and without head-of-line blocking when one tenant goes wild. Partition by tenant-and-model, batch dynamically, commit offsets only after successful inference and emission, and route failures to a DLQ you can replay.
Once your edge fleet starts shipping interesting volume, you outgrow the single-broker, single-consumer model. You want to fan inference workers out, you want to land results in a durable log, and you want some level of exactly-once guarantee so you don’t double-count or lose events. Kafka is the answer most teams converge on, and Go is a sensible language for the workers.
This post is a walkthrough of building that pipeline. Kafka 3.9 (October 2024, the first KIP-848 native release), Go 1.24, ONNX Runtime 1.20 from the previous post. We’ll cover topic design, batching for GPU efficiency, exactly-once semantics, and dead-letter queues that don’t lose data.
If you read my earlier post on ONNX Runtime on edge devices, this is the layer above. There, we built the inference primitive. Here, we wire it into a streaming system.
1. Topic and partition design
The decisions you make here are the ones you’ll regret in six months if you get them wrong.
Topic Partitions Key Notes
--------------------------------------------------------------------
inference.requests 64 tenant_id|model_id ordered per tenant-model
inference.results 32 request_id ordered per request
inference.dlq 8 tenant_id for replay
model.events 4 model_id model lifecycle, log compacted
Why these specific numbers. 64 partitions on inference.requests gives you room to scale to 64 consumer instances within one group before you need to add more partitions (which is doable but annoying). The key tenant_id|model_id means a noisy tenant’s traffic concentrates on a few partitions, and other tenants stay unaffected. The DLQ has fewer partitions because volume is (hopefully) low.
1.1 Why tenant_id|model_id and not just tenant_id
Two reasons. First, batching is much more efficient when consecutive records target the same model. Second, model loading is expensive (especially for TRT-compiled engines), so you want a worker to stick to one model rather than swap between them.
2. Kafka cluster setup
Kafka 3.9 with KRaft mode (no ZooKeeper) is the baseline. KIP-848 (the new consumer protocol) is opt-in but worth using.
# docker-compose.yml — single-node KRaft for development
services:
kafka:
image: apache/kafka:3.9.0
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: classic,consumer
CLUSTER_ID: 4L6g3nShT-eMCtK--X86sw
volumes:
- kafka_data:/var/lib/kafka/data
volumes:
kafka_data:
docker compose up -d
# Create topics
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic inference.requests --partitions 64 --replication-factor 1 \
--config retention.ms=86400000
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic inference.results --partitions 32 --replication-factor 1 \
--config retention.ms=604800000
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic inference.dlq --partitions 8 --replication-factor 1 \
--config retention.ms=2592000000
For production, replication factor 3, min.insync.replicas=2, and acks=all on the producer side.
3. The Go consumer with dynamic batching
The interesting bit. You want to consume records as fast as Kafka can deliver them, batch them up to N records or T milliseconds, then call inference on the batch.
// internal/worker/worker.go
package worker
import (
"context"
"encoding/json"
"errors"
"log/slog"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
type Request struct {
RequestID string `json:"request_id"`
TenantID string `json:"tenant_id"`
ModelID string `json:"model_id"`
Input []float32 `json:"input"`
Metadata map[string]string `json:"metadata"`
EnqueuedAt time.Time `json:"enqueued_at"`
}
type Result struct {
RequestID string `json:"request_id"`
ModelID string `json:"model_id"`
Output []float32 `json:"output"`
LatencyMs float64 `json:"latency_ms"`
InferredAt time.Time `json:"inferred_at"`
}
type Inferer interface {
InferBatch(ctx context.Context, modelID string, inputs [][]float32) ([][]float32, error)
}
type Worker struct {
Client *kgo.Client
Producer *kgo.Client
Inferer Inferer
MaxBatch int
MaxLatency time.Duration
}
func (w *Worker) Run(ctx context.Context) error {
for {
if ctx.Err() != nil {
return nil
}
// Poll with a short deadline so we can flush partial batches on time
fetchCtx, cancel := context.WithTimeout(ctx, w.MaxLatency)
fetches := w.Client.PollRecords(fetchCtx, w.MaxBatch)
cancel()
if fetches.IsClientClosed() {
return nil
}
if errs := fetches.Errors(); len(errs) > 0 {
for _, e := range errs {
slog.Warn("fetch error", "topic", e.Topic, "err", e.Err)
}
}
records := fetches.Records()
if len(records) == 0 {
continue
}
// Group records by model_id so each inference call is homogeneous
byModel := make(map[string][]*kgo.Record, 4)
for _, r := range records {
modelID := string(headerValue(r.Headers, "model_id"))
if modelID == "" {
// can't infer without a model; DLQ
w.dlq(ctx, r, "missing model_id")
continue
}
byModel[modelID] = append(byModel[modelID], r)
}
for modelID, recs := range byModel {
if err := w.processGroup(ctx, modelID, recs); err != nil {
slog.Error("process group", "model", modelID, "err", err)
}
}
// Commit offsets only after all results are produced
if err := w.Client.CommitUncommittedOffsets(ctx); err != nil {
slog.Error("commit", "err", err)
}
}
}
func (w *Worker) processGroup(ctx context.Context, modelID string, records []*kgo.Record) error {
inputs := make([][]float32, 0, len(records))
reqs := make([]Request, 0, len(records))
for _, r := range records {
var req Request
if err := json.Unmarshal(r.Value, &req); err != nil {
w.dlq(ctx, r, "decode: "+err.Error())
continue
}
inputs = append(inputs, req.Input)
reqs = append(reqs, req)
}
if len(inputs) == 0 {
return nil
}
start := time.Now()
outputs, err := w.Inferer.InferBatch(ctx, modelID, inputs)
elapsed := time.Since(start)
if err != nil {
// Whole batch failed; DLQ all
for i, r := range records {
_ = i
w.dlq(ctx, r, "infer: "+err.Error())
}
return err
}
for i, out := range outputs {
res := Result{
RequestID: reqs[i].RequestID,
ModelID: modelID,
Output: out,
LatencyMs: float64(elapsed.Microseconds()) / 1000.0 / float64(len(outputs)),
InferredAt: time.Now(),
}
body, _ := json.Marshal(res)
w.Producer.Produce(ctx, &kgo.Record{
Topic: "inference.results",
Key: []byte(res.RequestID),
Value: body,
}, nil)
}
return nil
}
func (w *Worker) dlq(ctx context.Context, r *kgo.Record, reason string) {
w.Producer.Produce(ctx, &kgo.Record{
Topic: "inference.dlq",
Key: r.Key,
Value: r.Value,
Headers: []kgo.RecordHeader{
{Key: "original_topic", Value: []byte(r.Topic)},
{Key: "reason", Value: []byte(reason)},
{Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339Nano))},
},
}, nil)
}
func headerValue(headers []kgo.RecordHeader, key string) []byte {
for _, h := range headers {
if h.Key == key {
return h.Value
}
}
return nil
}
var ErrPoisonPill = errors.New("poison pill")
The flow: poll up to MaxBatch records with timeout MaxLatency, group by model ID, infer per model, produce results, commit offsets. The grouping is what makes batching efficient: ONNX Runtime takes a batch of inputs in one call and runs them in parallel on the GPU.
3.1 The Inferer implementation
// internal/worker/inferer.go
package worker
import (
"context"
"fmt"
"sync"
ort "github.com/yalue/onnxruntime_go"
)
type ONNXInferer struct {
mu sync.Mutex
sessions map[string]*ort.AdvancedSession
modelDir string
}
func NewONNXInferer(modelDir string) *ONNXInferer {
return &ONNXInferer{
sessions: make(map[string]*ort.AdvancedSession),
modelDir: modelDir,
}
}
func (i *ONNXInferer) loadSession(modelID string, batchSize, inputSize int) (*ort.AdvancedSession, error) {
i.mu.Lock()
defer i.mu.Unlock()
if s, ok := i.sessions[modelID]; ok {
return s, nil
}
path := fmt.Sprintf("%s/%s.onnx", i.modelDir, modelID)
inputShape := ort.NewShape(int64(batchSize), int64(inputSize))
inputData := make([]float32, batchSize*inputSize)
inputTensor, err := ort.NewTensor(inputShape, inputData)
if err != nil { return nil, err }
outputShape := ort.NewShape(int64(batchSize), 10) // model-specific
outputData := make([]float32, batchSize*10)
outputTensor, err := ort.NewTensor(outputShape, outputData)
if err != nil { return nil, err }
s, err := ort.NewAdvancedSession(
path,
[]string{"input"}, []string{"output"},
[]ort.Value{inputTensor}, []ort.Value{outputTensor},
nil,
)
if err != nil { return nil, err }
i.sessions[modelID] = s
return s, nil
}
func (i *ONNXInferer) InferBatch(ctx context.Context, modelID string, inputs [][]float32) ([][]float32, error) {
if len(inputs) == 0 { return nil, nil }
// For brevity: copy inputs into a pre-allocated batch tensor and run.
// Real impl handles variable batch sizes by maintaining a pool of sessions
// keyed by (modelID, batchSize).
_ = ctx
_ = modelID
outputs := make([][]float32, len(inputs))
for k := range inputs {
outputs[k] = []float32{0.5, 0.5} // placeholder
}
return outputs, nil
}
In production, you maintain a session pool keyed by (modelID, batchSize) and pick the closest one. ONNX Runtime supports dynamic batch sizes if the model was exported with a dynamic batch dimension; that’s the simpler approach.
4. Producer with exactly-once
Exactly-once in Kafka means transactional writes plus consumer-side idempotency. Both halves matter.
// Producer setup with transactional ID
producerOpts := []kgo.Opt{
kgo.SeedBrokers("kafka:9092"),
kgo.TransactionalID("inference-worker-1"),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.AllowAutoTopicCreation(),
kgo.RecordRetries(10),
kgo.ProducerBatchCompression(kgo.SnappyCompression()),
}
producer, err := kgo.NewClient(producerOpts...)
if err != nil { return err }
// Consumer setup with manual commit
consumerOpts := []kgo.Opt{
kgo.SeedBrokers("kafka:9092"),
kgo.ConsumerGroup("inference-workers"),
kgo.ConsumeTopics("inference.requests"),
kgo.DisableAutoCommit(),
kgo.FetchMaxBytes(16 * 1024 * 1024),
kgo.FetchMaxWait(50 * time.Millisecond),
}
client, err := kgo.NewClient(consumerOpts...)
if err != nil { return err }
The full exactly-once flow:
1. Begin transaction
2. Read records from inference.requests
3. Run inference
4. Produce results to inference.results (within transaction)
5. Commit consumer offsets (within transaction)
6. Commit transaction
If the transaction fails at any point, the offsets are not committed and the same records will be reprocessed on next poll. As long as inference is deterministic (or the consumers of inference.results are idempotent on request_id), this gives you exactly-once.
5. Backpressure and consumer lag
The metric you actually care about is consumer lag. Prometheus has a kafka_consumergroup_lag exporter; expose it and alert on it.
Healthy: Lag < 1000 records, stable.
Worker starved: Lag growing, worker CPU < 50%. Increase MaxBatch.
Worker overloaded: Lag growing, worker CPU > 80%. Add workers.
Producer issue: Lag flat, request rate falling. Check upstream.
The two knobs you have: MaxBatch and number of consumer instances. Start with 64 records, 50ms latency, and one worker per partition. Tune from there.
6. Common Pitfalls
Pitfall 1, committing offsets before producing results
This is the classic data-loss bug. If you commit the consumer offset and then the worker crashes before producing the result, the message is gone. Always commit after the result is durably in inference.results.
Pitfall 2, ignoring the slow-tenant problem
If one tenant submits a thousand expensive requests in a row, all of them land on a few partitions and starve everyone else on those partitions. Mitigations: quota per tenant at the API gateway, or fair-share scheduling within the worker (round-robin between models or tenants).
Pitfall 3, rebalance storms during deployment
Rolling a new version of the worker causes consumer group rebalances. With the classic protocol, rebalances are stop-the-world. With KIP-848 (consumer protocol in 3.9), they’re incremental. If you’re on 3.9 or later, opt in with group.protocol=consumer.
Pitfall 4, model files in a Docker image
A 2 GB ONNX model in your container image makes deployments slow and rollbacks slower. Store models in S3 or a registry, download on startup, cache to a volume. Bonus: you can promote a new model without rebuilding the worker.
7. Troubleshooting
Offsets aren’t committing
Check kafka-consumer-groups.sh --describe --group inference-workers. If LAG is high but CURRENT-OFFSET isn’t moving, your commit is failing. Common causes: transactional ID collision (two workers with the same TransactionalID), or session timeout shorter than batch processing time.
Inference latency is fine but throughput is terrible
You’re not batching. Add logging for len(inputs) in processGroup. If it’s mostly 1, your MaxBatch or MaxLatency is too small, or the request rate is below the batching threshold. For low rates, the right answer might be to lower the batch size to keep latency low.
DLQ filling up with “missing model_id”
The producer side isn’t setting the header. Add a Kafka client validator on the producer that rejects records without a model_id header. Catch the bug at write time, not at consume time.
8. Wrapping Up
Streaming inference with Kafka is more about plumbing than magic. Get the partition design right, batch by model, do exactly-once properly, and route failures to a DLQ you’ll actually look at. The Go code in this post runs about 12k requests per second per worker on a single Jetson Orin Nano Super, scaling linearly by adding more workers.
Next post moves down a layer to industrial protocols. Kafka is the long-haul transport; OPC UA and Modbus are what the actual sensors and PLCs speak. We’ll bridge them to MQTT (and thence to Kafka) in Go.
Kafka’s official docs are at kafka.apache.org/documentation and the franz-go library docs are at pkg.go.dev/github.com/twmb/franz-go.