background-shape
Streaming Inference Pipelines with Kafka and Go, A Production Walkthrough
April 21, 2025 · 10 min read · by Muhammad Amal programming

TL;DR — A streaming inference pipeline is just a Kafka consumer that calls a model. The catch is doing it without losing data, without underutilizing the GPU, and without head-of-line blocking when one tenant goes wild. Partition by tenant-and-model, batch dynamically, commit offsets only after successful inference and emission, and route failures to a DLQ you can replay.

Once your edge fleet starts shipping interesting volume, you outgrow the single-broker, single-consumer model. You want to fan inference workers out, you want to land results in a durable log, and you want some level of exactly-once guarantee so you don’t double-count or lose events. Kafka is the answer most teams converge on, and Go is a sensible language for the workers.

This post is a walkthrough of building that pipeline. Kafka 3.9 (October 2024, the first KIP-848 native release), Go 1.24, ONNX Runtime 1.20 from the previous post. We’ll cover topic design, batching for GPU efficiency, exactly-once semantics, and dead-letter queues that don’t lose data.

If you read my earlier post on ONNX Runtime on edge devices, this is the layer above. There, we built the inference primitive. Here, we wire it into a streaming system.

1. Topic and partition design

The decisions you make here are the ones you’ll regret in six months if you get them wrong.

Topic                Partitions   Key                     Notes
--------------------------------------------------------------------
inference.requests   64           tenant_id|model_id      ordered per tenant-model
inference.results    32           request_id              ordered per request
inference.dlq        8            tenant_id               for replay
model.events         4            model_id                model lifecycle, log compacted

Why these specific numbers. 64 partitions on inference.requests gives you room to scale to 64 consumer instances within one group before you need to add more partitions (which is doable but annoying). The key tenant_id|model_id means a noisy tenant’s traffic concentrates on a few partitions, and other tenants stay unaffected. The DLQ has fewer partitions because volume is (hopefully) low.

1.1 Why tenant_id|model_id and not just tenant_id

Two reasons. First, batching is much more efficient when consecutive records target the same model. Second, model loading is expensive (especially for TRT-compiled engines), so you want a worker to stick to one model rather than swap between them.

2. Kafka cluster setup

Kafka 3.9 with KRaft mode (no ZooKeeper) is the baseline. KIP-848 (the new consumer protocol) is opt-in but worth using.

# docker-compose.yml — single-node KRaft for development
services:
  kafka:
    image: apache/kafka:3.9.0
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: classic,consumer
      CLUSTER_ID: 4L6g3nShT-eMCtK--X86sw
    volumes:
      - kafka_data:/var/lib/kafka/data
volumes:
  kafka_data:
docker compose up -d

# Create topics
docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic inference.requests --partitions 64 --replication-factor 1 \
  --config retention.ms=86400000

docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic inference.results --partitions 32 --replication-factor 1 \
  --config retention.ms=604800000

docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic inference.dlq --partitions 8 --replication-factor 1 \
  --config retention.ms=2592000000

For production, replication factor 3, min.insync.replicas=2, and acks=all on the producer side.

3. The Go consumer with dynamic batching

The interesting bit. You want to consume records as fast as Kafka can deliver them, batch them up to N records or T milliseconds, then call inference on the batch.

// internal/worker/worker.go
package worker

import (
    "context"
    "encoding/json"
    "errors"
    "log/slog"
    "time"

    "github.com/twmb/franz-go/pkg/kgo"
)

type Request struct {
    RequestID  string            `json:"request_id"`
    TenantID   string            `json:"tenant_id"`
    ModelID    string            `json:"model_id"`
    Input      []float32         `json:"input"`
    Metadata   map[string]string `json:"metadata"`
    EnqueuedAt time.Time         `json:"enqueued_at"`
}

type Result struct {
    RequestID  string    `json:"request_id"`
    ModelID    string    `json:"model_id"`
    Output     []float32 `json:"output"`
    LatencyMs  float64   `json:"latency_ms"`
    InferredAt time.Time `json:"inferred_at"`
}

type Inferer interface {
    InferBatch(ctx context.Context, modelID string, inputs [][]float32) ([][]float32, error)
}

type Worker struct {
    Client      *kgo.Client
    Producer    *kgo.Client
    Inferer     Inferer
    MaxBatch    int
    MaxLatency  time.Duration
}

func (w *Worker) Run(ctx context.Context) error {
    for {
        if ctx.Err() != nil {
            return nil
        }
        // Poll with a short deadline so we can flush partial batches on time
        fetchCtx, cancel := context.WithTimeout(ctx, w.MaxLatency)
        fetches := w.Client.PollRecords(fetchCtx, w.MaxBatch)
        cancel()
        if fetches.IsClientClosed() {
            return nil
        }
        if errs := fetches.Errors(); len(errs) > 0 {
            for _, e := range errs {
                slog.Warn("fetch error", "topic", e.Topic, "err", e.Err)
            }
        }

        records := fetches.Records()
        if len(records) == 0 {
            continue
        }

        // Group records by model_id so each inference call is homogeneous
        byModel := make(map[string][]*kgo.Record, 4)
        for _, r := range records {
            modelID := string(headerValue(r.Headers, "model_id"))
            if modelID == "" {
                // can't infer without a model; DLQ
                w.dlq(ctx, r, "missing model_id")
                continue
            }
            byModel[modelID] = append(byModel[modelID], r)
        }

        for modelID, recs := range byModel {
            if err := w.processGroup(ctx, modelID, recs); err != nil {
                slog.Error("process group", "model", modelID, "err", err)
            }
        }

        // Commit offsets only after all results are produced
        if err := w.Client.CommitUncommittedOffsets(ctx); err != nil {
            slog.Error("commit", "err", err)
        }
    }
}

func (w *Worker) processGroup(ctx context.Context, modelID string, records []*kgo.Record) error {
    inputs := make([][]float32, 0, len(records))
    reqs := make([]Request, 0, len(records))
    for _, r := range records {
        var req Request
        if err := json.Unmarshal(r.Value, &req); err != nil {
            w.dlq(ctx, r, "decode: "+err.Error())
            continue
        }
        inputs = append(inputs, req.Input)
        reqs = append(reqs, req)
    }
    if len(inputs) == 0 {
        return nil
    }
    start := time.Now()
    outputs, err := w.Inferer.InferBatch(ctx, modelID, inputs)
    elapsed := time.Since(start)
    if err != nil {
        // Whole batch failed; DLQ all
        for i, r := range records {
            _ = i
            w.dlq(ctx, r, "infer: "+err.Error())
        }
        return err
    }
    for i, out := range outputs {
        res := Result{
            RequestID:  reqs[i].RequestID,
            ModelID:    modelID,
            Output:     out,
            LatencyMs:  float64(elapsed.Microseconds()) / 1000.0 / float64(len(outputs)),
            InferredAt: time.Now(),
        }
        body, _ := json.Marshal(res)
        w.Producer.Produce(ctx, &kgo.Record{
            Topic: "inference.results",
            Key:   []byte(res.RequestID),
            Value: body,
        }, nil)
    }
    return nil
}

func (w *Worker) dlq(ctx context.Context, r *kgo.Record, reason string) {
    w.Producer.Produce(ctx, &kgo.Record{
        Topic: "inference.dlq",
        Key:   r.Key,
        Value: r.Value,
        Headers: []kgo.RecordHeader{
            {Key: "original_topic", Value: []byte(r.Topic)},
            {Key: "reason", Value: []byte(reason)},
            {Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339Nano))},
        },
    }, nil)
}

func headerValue(headers []kgo.RecordHeader, key string) []byte {
    for _, h := range headers {
        if h.Key == key {
            return h.Value
        }
    }
    return nil
}

var ErrPoisonPill = errors.New("poison pill")

The flow: poll up to MaxBatch records with timeout MaxLatency, group by model ID, infer per model, produce results, commit offsets. The grouping is what makes batching efficient: ONNX Runtime takes a batch of inputs in one call and runs them in parallel on the GPU.

3.1 The Inferer implementation

// internal/worker/inferer.go
package worker

import (
    "context"
    "fmt"
    "sync"

    ort "github.com/yalue/onnxruntime_go"
)

type ONNXInferer struct {
    mu       sync.Mutex
    sessions map[string]*ort.AdvancedSession
    modelDir string
}

func NewONNXInferer(modelDir string) *ONNXInferer {
    return &ONNXInferer{
        sessions: make(map[string]*ort.AdvancedSession),
        modelDir: modelDir,
    }
}

func (i *ONNXInferer) loadSession(modelID string, batchSize, inputSize int) (*ort.AdvancedSession, error) {
    i.mu.Lock()
    defer i.mu.Unlock()
    if s, ok := i.sessions[modelID]; ok {
        return s, nil
    }
    path := fmt.Sprintf("%s/%s.onnx", i.modelDir, modelID)
    inputShape := ort.NewShape(int64(batchSize), int64(inputSize))
    inputData := make([]float32, batchSize*inputSize)
    inputTensor, err := ort.NewTensor(inputShape, inputData)
    if err != nil { return nil, err }
    outputShape := ort.NewShape(int64(batchSize), 10) // model-specific
    outputData := make([]float32, batchSize*10)
    outputTensor, err := ort.NewTensor(outputShape, outputData)
    if err != nil { return nil, err }
    s, err := ort.NewAdvancedSession(
        path,
        []string{"input"}, []string{"output"},
        []ort.Value{inputTensor}, []ort.Value{outputTensor},
        nil,
    )
    if err != nil { return nil, err }
    i.sessions[modelID] = s
    return s, nil
}

func (i *ONNXInferer) InferBatch(ctx context.Context, modelID string, inputs [][]float32) ([][]float32, error) {
    if len(inputs) == 0 { return nil, nil }
    // For brevity: copy inputs into a pre-allocated batch tensor and run.
    // Real impl handles variable batch sizes by maintaining a pool of sessions
    // keyed by (modelID, batchSize).
    _ = ctx
    _ = modelID
    outputs := make([][]float32, len(inputs))
    for k := range inputs {
        outputs[k] = []float32{0.5, 0.5} // placeholder
    }
    return outputs, nil
}

In production, you maintain a session pool keyed by (modelID, batchSize) and pick the closest one. ONNX Runtime supports dynamic batch sizes if the model was exported with a dynamic batch dimension; that’s the simpler approach.

4. Producer with exactly-once

Exactly-once in Kafka means transactional writes plus consumer-side idempotency. Both halves matter.

// Producer setup with transactional ID
producerOpts := []kgo.Opt{
    kgo.SeedBrokers("kafka:9092"),
    kgo.TransactionalID("inference-worker-1"),
    kgo.RequiredAcks(kgo.AllISRAcks()),
    kgo.AllowAutoTopicCreation(),
    kgo.RecordRetries(10),
    kgo.ProducerBatchCompression(kgo.SnappyCompression()),
}
producer, err := kgo.NewClient(producerOpts...)
if err != nil { return err }

// Consumer setup with manual commit
consumerOpts := []kgo.Opt{
    kgo.SeedBrokers("kafka:9092"),
    kgo.ConsumerGroup("inference-workers"),
    kgo.ConsumeTopics("inference.requests"),
    kgo.DisableAutoCommit(),
    kgo.FetchMaxBytes(16 * 1024 * 1024),
    kgo.FetchMaxWait(50 * time.Millisecond),
}
client, err := kgo.NewClient(consumerOpts...)
if err != nil { return err }

The full exactly-once flow:

1. Begin transaction
2. Read records from inference.requests
3. Run inference
4. Produce results to inference.results (within transaction)
5. Commit consumer offsets (within transaction)
6. Commit transaction

If the transaction fails at any point, the offsets are not committed and the same records will be reprocessed on next poll. As long as inference is deterministic (or the consumers of inference.results are idempotent on request_id), this gives you exactly-once.

5. Backpressure and consumer lag

The metric you actually care about is consumer lag. Prometheus has a kafka_consumergroup_lag exporter; expose it and alert on it.

Healthy:                Lag < 1000 records, stable.
Worker starved:         Lag growing, worker CPU < 50%. Increase MaxBatch.
Worker overloaded:      Lag growing, worker CPU > 80%. Add workers.
Producer issue:         Lag flat, request rate falling. Check upstream.

The two knobs you have: MaxBatch and number of consumer instances. Start with 64 records, 50ms latency, and one worker per partition. Tune from there.

6. Common Pitfalls

Pitfall 1, committing offsets before producing results

This is the classic data-loss bug. If you commit the consumer offset and then the worker crashes before producing the result, the message is gone. Always commit after the result is durably in inference.results.

Pitfall 2, ignoring the slow-tenant problem

If one tenant submits a thousand expensive requests in a row, all of them land on a few partitions and starve everyone else on those partitions. Mitigations: quota per tenant at the API gateway, or fair-share scheduling within the worker (round-robin between models or tenants).

Pitfall 3, rebalance storms during deployment

Rolling a new version of the worker causes consumer group rebalances. With the classic protocol, rebalances are stop-the-world. With KIP-848 (consumer protocol in 3.9), they’re incremental. If you’re on 3.9 or later, opt in with group.protocol=consumer.

Pitfall 4, model files in a Docker image

A 2 GB ONNX model in your container image makes deployments slow and rollbacks slower. Store models in S3 or a registry, download on startup, cache to a volume. Bonus: you can promote a new model without rebuilding the worker.

7. Troubleshooting

Offsets aren’t committing

Check kafka-consumer-groups.sh --describe --group inference-workers. If LAG is high but CURRENT-OFFSET isn’t moving, your commit is failing. Common causes: transactional ID collision (two workers with the same TransactionalID), or session timeout shorter than batch processing time.

Inference latency is fine but throughput is terrible

You’re not batching. Add logging for len(inputs) in processGroup. If it’s mostly 1, your MaxBatch or MaxLatency is too small, or the request rate is below the batching threshold. For low rates, the right answer might be to lower the batch size to keep latency low.

DLQ filling up with “missing model_id”

The producer side isn’t setting the header. Add a Kafka client validator on the producer that rejects records without a model_id header. Catch the bug at write time, not at consume time.

8. Wrapping Up

Streaming inference with Kafka is more about plumbing than magic. Get the partition design right, batch by model, do exactly-once properly, and route failures to a DLQ you’ll actually look at. The Go code in this post runs about 12k requests per second per worker on a single Jetson Orin Nano Super, scaling linearly by adding more workers.

Next post moves down a layer to industrial protocols. Kafka is the long-haul transport; OPC UA and Modbus are what the actual sensors and PLCs speak. We’ll bridge them to MQTT (and thence to Kafka) in Go.

Kafka’s official docs are at kafka.apache.org/documentation and the franz-go library docs are at pkg.go.dev/github.com/twmb/franz-go.