Processing Millions of Sensor Events per Second with Go
TL;DR — Partition by sensor so events for one device land on one consumer in order / Batch aggressively, both publishing and acknowledging / Per-event work is the enemy at millions of events per second
A single process with a ring buffer takes you a long way, but eventually the line outgrows one machine. Add a few hundred cameras and a few thousand sensors across several buildings and you are firmly in distributed-systems territory. The numbers stop being comfortable: a million events per second is roughly 86 billion a day, and at that volume every per-event allocation, every per-event syscall, every per-event ack becomes a wall.
Golang sensor event processing at this scale is won and lost on a single principle: do work in batches, never per event. Publishing one event per network call, acknowledging one message at a time, parsing one JSON object per iteration — each is fine at a thousand events per second and fatal at a million. The architecture has to be batch-native from the producer all the way through the consumer.
This article builds a distributed sensor processing system in Go 1.24 on NATS JetStream: partitioned subjects so a device’s events stay ordered, batched publishing from the edge, and a consumer pool that pulls, processes, and acknowledges in bulk. It is the horizontal scale-out of the single-process design in the Go telemetry pipeline article .
Why NATS and Why Partitioning
NATS JetStream gives you a persistent, replicated log with pull consumers and explicit acknowledgement — the properties you need for telemetry that must not vanish on a broker restart. It is also operationally light, which matters when the system runs in an industrial environment rather than a hyperscaler.
The critical design choice is the subject hierarchy. Events for a single sensor must stay ordered, but events across sensors are independent and should parallelize freely. Encode the partition directly into the subject.
sensors.<plant>.<partition>.<sensor_id>
sensors.jakarta.07.cam-1142
sensors.jakarta.07.temp-0091
sensors.bandung.03.vib-5510
The partition number is a hash of the sensor ID modulo the partition count. Every event from one sensor lands in one partition, and one consumer owns each partition. Ordering per device is preserved; throughput scales with partition count.
The Stream
Define the JetStream stream with a partition-aware subject filter and a retention policy that fits your storage budget.
// stream.go
package events
import (
"context"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const PartitionCount = 64
func EnsureStream(ctx context.Context, js jetstream.JetStream) error {
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "SENSORS",
Subjects: []string{"sensors.>"},
Retention: jetstream.LimitsPolicy,
Storage: jetstream.FileStorage,
Replicas: 3,
MaxAge: 72 * time.Hour,
MaxBytes: 500 << 30, // 500 GiB cap
Discard: jetstream.DiscardOld,
})
return err
}
// PartitionFor maps a sensor id to a stable partition.
func PartitionFor(sensorID string) uint32 {
var h uint32 = 2166136261 // FNV-1a
for i := 0; i < len(sensorID); i++ {
h ^= uint32(sensorID[i])
h *= 16777619
}
return h % PartitionCount
}
Batched Publishing From the Edge
The edge node should not publish one message per sensor reading. It accumulates readings and flushes them as a batch — either when the batch is full or when a short timer fires, whichever comes first. This bounds latency while amortizing the network cost.
// publisher.go
package events
import (
"context"
"sync"
"time"
"github.com/nats-io/nats.go/jetstream"
"google.golang.org/protobuf/proto"
)
type BatchPublisher struct {
js jetstream.JetStream
plant string
mu sync.Mutex
pending map[uint32][]*Reading // partition -> readings
maxBatch int
flushIvl time.Duration
}
func NewBatchPublisher(js jetstream.JetStream, plant string) *BatchPublisher {
return &BatchPublisher{
js: js,
plant: plant,
pending: make(map[uint32][]*Reading),
maxBatch: 1000,
flushIvl: 20 * time.Millisecond,
}
}
func (p *BatchPublisher) Add(r *Reading) {
part := PartitionFor(r.SensorId)
p.mu.Lock()
p.pending[part] = append(p.pending[part], r)
full := len(p.pending[part]) >= p.maxBatch
batch := p.pending[part]
if full {
p.pending[part] = nil
}
p.mu.Unlock()
if full {
p.flushPartition(part, batch)
}
}
func (p *BatchPublisher) Run(ctx context.Context) {
ticker := time.NewTicker(p.flushIvl)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
p.flushAll()
return
case <-ticker.C:
p.flushAll()
}
}
}
func (p *BatchPublisher) flushAll() {
p.mu.Lock()
snapshot := p.pending
p.pending = make(map[uint32][]*Reading)
p.mu.Unlock()
for part, batch := range snapshot {
if len(batch) > 0 {
p.flushPartition(part, batch)
}
}
}
func (p *BatchPublisher) flushPartition(part uint32, batch []*Reading) {
payload, err := proto.Marshal(&ReadingBatch{Readings: batch})
if err != nil {
return
}
subj := subject(p.plant, part)
// PublishAsync does not block on the round trip; ack handled separately.
_, _ = p.js.PublishAsync(subj, payload)
}
Use protobuf, not JSON. At a million events per second JSON parsing alone will eat a core or three, and the wire size difference is real bandwidth. PublishAsync keeps the publish path from blocking on each round trip — JetStream tracks the acknowledgements in flight.
The Partitioned Consumer Pool
Each consumer owns a slice of partitions and pulls messages in batches. The batch pull is the single most important throughput lever: one Fetch call returns hundreds of messages, you process them, and you acknowledge the whole batch together.
// consumer.go
package events
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/nats-io/nats.go/jetstream"
"google.golang.org/protobuf/proto"
)
type Processor interface {
Handle(ctx context.Context, readings []*Reading) error
}
type PartitionConsumer struct {
js jetstream.JetStream
partition uint32
proc Processor
log *slog.Logger
}
func (c *PartitionConsumer) Run(ctx context.Context, plant string) error {
cons, err := c.js.CreateOrUpdateConsumer(ctx, "SENSORS",
jetstream.ConsumerConfig{
Durable: fmt.Sprintf("worker-%s-%d", plant, c.partition),
FilterSubject: subject(plant, c.partition),
AckPolicy: jetstream.AckExplicitPolicy,
MaxAckPending: 4000,
AckWait: 30 * time.Second,
})
if err != nil {
return fmt.Errorf("create consumer: %w", err)
}
for {
if ctx.Err() != nil {
return ctx.Err()
}
msgs, err := cons.Fetch(500, jetstream.FetchMaxWait(100*time.Millisecond))
if err != nil {
c.log.Warn("fetch error", "partition", c.partition, "err", err)
time.Sleep(200 * time.Millisecond)
continue
}
c.processBatch(ctx, msgs)
}
}
func (c *PartitionConsumer) processBatch(ctx context.Context,
msgs jetstream.MessageBatch) {
var collected []jetstream.Msg
var readings []*Reading
for msg := range msgs.Messages() {
var rb ReadingBatch
if err := proto.Unmarshal(msg.Data(), &rb); err != nil {
// Poison message: terminate so it is not redelivered forever.
_ = msg.Term()
c.log.Error("undecodable message terminated",
"partition", c.partition)
continue
}
readings = append(readings, rb.Readings...)
collected = append(collected, msg)
}
if len(collected) == 0 {
return
}
if err := c.proc.Handle(ctx, readings); err != nil {
// Processing failed: nak the batch for redelivery with backoff.
for _, m := range collected {
_ = m.NakWithDelay(2 * time.Second)
}
c.log.Error("batch handling failed, nak'd",
"partition", c.partition, "msgs", len(collected), "err", err)
return
}
// Success: ack the whole batch.
for _, m := range collected {
_ = m.Ack()
}
}
Three things make this fast and correct. The Fetch(500, ...) pulls a batch instead of a message. The poison-message path calls Term() so an undecodable message does not loop forever and stall the partition. And ack-or-nak happens for the whole batch, so a transient downstream failure replays the batch rather than losing it.
Aggregation, the Real Work
Most sensor processing is aggregation: rolling averages, threshold crossings, anomaly flags. Do it over the batch in one pass, grouped by sensor, and emit aggregates rather than raw points downstream. A million raw events in might be ten thousand aggregates out.
// aggregator.go
package events
import (
"context"
"math"
)
type Aggregator struct {
sink AggregateSink
}
type Window struct {
SensorID string
Count int
Sum float64
Min float64
Max float64
}
func (a *Aggregator) Handle(ctx context.Context, readings []*Reading) error {
windows := make(map[string]*Window, 1024)
for _, r := range readings {
w, ok := windows[r.SensorId]
if !ok {
w = &Window{SensorID: r.SensorId,
Min: math.MaxFloat64, Max: -math.MaxFloat64}
windows[r.SensorId] = w
}
w.Count++
w.Sum += r.Value
w.Min = math.Min(w.Min, r.Value)
w.Max = math.Max(w.Max, r.Value)
}
out := make([]*Window, 0, len(windows))
for _, w := range windows {
out = append(out, w)
}
return a.sink.WriteAggregates(ctx, out)
}
Capacity Planning
Right-size the partition count up front. Partitions are cheap to have and expensive to change later, because changing the count rehashes every sensor and breaks ordering during the transition. Pick a count comfortably above your worst-case consumer count — 64 or 128 — and run multiple partitions per consumer when load is light.
A rough sizing pass: measure single-consumer throughput with a representative Handle, divide your target event rate by that number, add 50 percent headroom, and round up to a power of two for clean hashing.
Common Pitfalls
- Per-event publishing. One NATS publish per reading caps you well below your target. Batch on the edge.
- JSON on the hot path. JSON marshal and unmarshal is a CPU sink at scale. Use protobuf.
- Per-message ack. Acknowledging individually multiplies broker round trips. Ack the batch.
- Poison messages with no terminal path. An undecodable message that only gets nak’d redelivers forever and stalls its partition. Call
Term(). - Changing partition count under load. Rehashing breaks per-sensor ordering mid-flight. Size partitions generously from the start.
Troubleshooting
- Symptom: Consumer throughput plateaus far below target. Cause: Fetching one message at a time or acking individually. Fix: Pull with
Fetch(n, ...)and ack the whole batch. - Symptom: One partition lags while others keep up. Cause: A hot sensor or a poison message stuck in redelivery. Fix: Inspect the partition, terminate undecodable messages, and consider splitting the hot sensor.
- Symptom: Memory climbs on consumers. Cause:
MaxAckPendingset too high, buffering more unacked messages than memory allows. Fix: LowerMaxAckPendingto match real processing throughput. - Symptom: Events redelivered repeatedly. Cause: Processing exceeds
AckWaitso JetStream assumes failure. Fix: RaiseAckWaitor shrink the fetch batch so a batch completes in time. - Symptom: Ordering violations for a single sensor. Cause: That sensor’s events spread across partitions, often after a partition-count change. Fix: Use a stable hash and keep partition count fixed.
Wrapping Up
At millions of events per second the architecture is the performance work: partition by sensor for ordering, batch from edge to consumer, and aggregate early so downstream sees thousands of summaries instead of millions of raw points. Get the partition count right on day one and the system scales horizontally by adding consumers. The JetStream reference and consumer semantics are documented at docs.nats.io .