Building a High-Throughput Telemetry Pipeline in Go
TL;DR — Decouple ingestion from processing so a slow consumer cannot stall a fast producer / Use a ring buffer for bounded memory and an explicit drop policy / Make backpressure a deliberate decision, not an accident
An industrial line generates telemetry the way a fire hose generates water. Vision systems emit detection events, PLCs emit cycle counters, sensors emit temperature and vibration samples — and they all arrive on their own schedule, indifferent to whether your database is having a slow moment. The naive pipeline, where ingestion writes straight to storage, works perfectly until the day storage hiccups. Then the hiccup propagates backward, the ingestion goroutine blocks, and you start losing data at the source.
A high-throughput telemetry pipeline in Go is fundamentally an exercise in decoupling. The producer side and the consumer side must run at independent speeds, connected by a bounded buffer that absorbs bursts. When the buffer fills — and it will — you need an explicit, considered policy for what happens next. Drop the oldest sample? Drop the newest? Block? Each is correct for some telemetry and catastrophic for others.
This article builds that pipeline end to end in Go 1.24: a lock-free-ish ring buffer for the hot path, a fan-out of worker goroutines, batched writes downstream, and graceful shutdown that drains in flight data. The pipeline pairs naturally with the edge vision stack from the industrial edge architecture article .
The Shape of the Problem
Three components, three different rate characteristics:
- Producers — fast, bursty, must never block. A blocked producer is a dropped sensor reading or a stalled camera callback.
- Buffer — bounded. Unbounded buffers do not solve backpressure, they defer it until an out-of-memory crash.
- Consumers — slower, variable latency, batch-friendly. Databases and message brokers love batched writes and hate per-event round trips.
The buffer is where the design lives. A plain Go channel is a bounded buffer with blocking semantics, and for many pipelines that is exactly right. But when the requirement is “never block the producer, drop under pressure instead,” you want a ring buffer with an explicit overwrite policy.
A Ring Buffer for the Hot Path
A ring buffer gives you bounded memory, contiguous allocation, and a clear overwrite-oldest semantic. Here is a generic, mutex-guarded implementation that is fast enough for hundreds of thousands of events per second and simple enough to reason about.
// ringbuffer.go
package telemetry
import "sync"
// RingBuffer is a bounded buffer that overwrites the oldest entry when full.
type RingBuffer[T any] struct {
mu sync.Mutex
buf []T
head int // next write position
tail int // next read position
size int // number of valid entries
capacity int
dropped uint64
}
func NewRingBuffer[T any](capacity int) *RingBuffer[T] {
if capacity <= 0 {
panic("ring buffer capacity must be positive")
}
return &RingBuffer[T]{
buf: make([]T, capacity),
capacity: capacity,
}
}
// Push never blocks. If the buffer is full it overwrites the oldest entry
// and increments the dropped counter.
func (r *RingBuffer[T]) Push(v T) {
r.mu.Lock()
defer r.mu.Unlock()
if r.size == r.capacity {
r.tail = (r.tail + 1) % r.capacity // discard oldest
r.dropped++
} else {
r.size++
}
r.buf[r.head] = v
r.head = (r.head + 1) % r.capacity
}
// PopBatch removes up to n entries into dst and returns the count taken.
func (r *RingBuffer[T]) PopBatch(dst []T) int {
r.mu.Lock()
defer r.mu.Unlock()
n := min(len(dst), r.size)
for i := 0; i < n; i++ {
dst[i] = r.buf[r.tail]
r.tail = (r.tail + 1) % r.capacity
}
r.size -= n
return n
}
func (r *RingBuffer[T]) Dropped() uint64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.dropped
}
The dropped counter is not decoration. It is your single most important signal that the consumer side cannot keep up. Export it as a metric and alert on its rate of change.
The Event Model
Keep the event struct flat and free of pointers where you can. Garbage collection pressure is a real cost at high event rates, and a struct that fits in a cache line and contains no pointers is one the GC barely notices.
// event.go
package telemetry
import "time"
type Kind uint8
const (
KindSensor Kind = iota
KindDetection
KindCycle
)
// Event is intentionally pointer-free for low GC pressure.
type Event struct {
Timestamp int64 // unix nanoseconds
SourceID uint32 // sensor or camera id
Kind Kind
Value float64
Sequence uint64 // per-source monotonic counter for gap detection
}
func NewSensorEvent(src uint32, seq uint64, v float64) Event {
return Event{
Timestamp: time.Now().UnixNano(),
SourceID: src,
Kind: KindSensor,
Value: v,
Sequence: seq,
}
}
The per-source Sequence lets consumers detect gaps. If sequence numbers jump, you dropped events somewhere, and you want to know that downstream rather than guess.
The Pipeline
Now assemble it. An ingest stage feeds the ring buffer. A pump goroutine drains the buffer in batches and fans those batches out to a pool of workers over a channel. Workers do the slow work — serialization, network writes — in parallel.
// pipeline.go
package telemetry
import (
"context"
"log/slog"
"sync"
"time"
)
type Sink interface {
WriteBatch(ctx context.Context, events []Event) error
}
type Pipeline struct {
ring *RingBuffer[Event]
batchChan chan []Event
sink Sink
workers int
batchSize int
flushIvl time.Duration
wg sync.WaitGroup
log *slog.Logger
}
func NewPipeline(sink Sink, log *slog.Logger) *Pipeline {
return &Pipeline{
ring: NewRingBuffer[Event](1 << 16), // 65536 events
batchChan: make(chan []Event, 64),
sink: sink,
workers: 8,
batchSize: 512,
flushIvl: 50 * time.Millisecond,
log: log,
}
}
// Ingest is the producer entry point. It never blocks.
func (p *Pipeline) Ingest(e Event) {
p.ring.Push(e)
}
// Run starts the pump and workers. It returns when ctx is cancelled and
// all in-flight events have been drained.
func (p *Pipeline) Run(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
p.wg.Add(1)
go p.pump(ctx)
p.wg.Wait()
}
// pump drains the ring buffer in batches on a ticker and on size triggers.
func (p *Pipeline) pump(ctx context.Context) {
defer p.wg.Done()
defer close(p.batchChan)
ticker := time.NewTicker(p.flushIvl)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
p.drain() // final drain on shutdown
return
case <-ticker.C:
p.drain()
}
}
}
func (p *Pipeline) drain() {
for {
batch := make([]Event, p.batchSize)
n := p.ring.PopBatch(batch)
if n == 0 {
return
}
p.batchChan <- batch[:n]
}
}
func (p *Pipeline) worker(id int) {
defer p.wg.Done()
for batch := range p.batchChan {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := p.writeWithRetry(ctx, batch); err != nil {
p.log.Error("batch write failed permanently",
"worker", id, "events", len(batch), "err", err)
}
cancel()
}
}
Retry With Bounded Backoff
A worker that retries forever is a worker that holds a batch hostage while the buffer overflows behind it. Retry a bounded number of times with exponential backoff, then give up and log loudly so the dropped batch is visible.
// retry.go
package telemetry
import (
"context"
"errors"
"time"
)
func (p *Pipeline) writeWithRetry(ctx context.Context, batch []Event) error {
const maxAttempts = 4
backoff := 100 * time.Millisecond
var lastErr error
for attempt := 1; attempt <= maxAttempts; attempt++ {
lastErr = p.sink.WriteBatch(ctx, batch)
if lastErr == nil {
return nil
}
if ctx.Err() != nil {
return errors.Join(lastErr, ctx.Err())
}
p.log.Warn("batch write retry",
"attempt", attempt, "err", lastErr)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
backoff *= 2
}
return errors.Join(errors.New("batch write exhausted retries"), lastErr)
}
Graceful Shutdown
The shutdown path is where pipelines lose data they should have kept. When the process gets a SIGTERM, you want to stop accepting new events, drain the ring buffer, flush every in-flight batch, and only then exit.
// main.go
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"example.com/telemetry"
)
func main() {
log := slog.New(slog.NewJSONHandler(os.Stdout, nil))
sink := telemetry.NewTimescaleSink(os.Getenv("TSDB_DSN"))
pipe := telemetry.NewPipeline(sink, log)
ctx, stop := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer stop()
go ingestFromSources(ctx, pipe) // producers call pipe.Ingest
log.Info("telemetry pipeline started")
pipe.Run(ctx) // blocks until ctx cancelled and drained
log.Info("telemetry pipeline drained, exiting")
}
Because pump calls drain one last time after ctx.Done() and Run only returns after wg.Wait(), every event already in the ring buffer reaches a worker before the process exits. The events you genuinely cannot save are the ones still in flight at the producer when the signal arrives, and a short producer-side grace period covers most of those.
Common Pitfalls
- Unbounded channels or slices. They do not provide backpressure, they just convert it into an out-of-memory crash later.
- No drop policy. When the buffer fills, the behavior must be a decision you made, not whatever the runtime happens to do.
- Retrying forever in a worker. One stuck batch blocks a worker, shrinking effective throughput and feeding overflow.
- Pointer-heavy event structs. At high rates, GC scanning of pointer-laden events becomes a measurable tax.
- Ignoring the dropped counter. It is the earliest, clearest signal that your consumer side is underprovisioned.
Troubleshooting
- Symptom: Memory grows without bound until OOM. Cause: An unbounded buffer somewhere — a channel with no cap or a growing slice. Fix: Replace it with a fixed-capacity ring buffer and a defined drop policy.
- Symptom: Producers stall intermittently. Cause: The producer path touches a blocking operation, often a full channel. Fix: Route producers exclusively through the non-blocking ring buffer
Push. - Symptom:
droppedcounter climbs steadily. Cause: Consumer throughput is below ingest rate. Fix: Add workers, increase batch size, or check the sink for a slowdown. - Symptom: Data lost on deploy or restart. Cause: Shutdown exits before draining. Fix: Ensure the pump does a final drain after context cancellation and
Runwaits on the WaitGroup. - Symptom: Latency spikes every few seconds. Cause: GC pauses from pointer-heavy events or large per-batch allocations. Fix: Flatten the event struct and pool batch slices with
sync.Pool.
What’s Next
A telemetry pipeline that survives production is one where backpressure is explicit, the buffer is bounded, and shutdown drains cleanly. Get those three right and the pipeline absorbs bursts and degrades predictably instead of collapsing. The next step is scaling past a single process — distributing ingestion across nodes with a message broker, which is exactly where processing sensor events at scale with Go and NATS picks up. The Go concurrency reference is at go.dev/doc .