background-shape
Golang article cover illustration on a gradient background
April 10, 2026 · 8 min read · by Muhammad Amal programming
Advertisement

TL;DR — Decouple ingestion from processing so a slow consumer cannot stall a fast producer / Use a ring buffer for bounded memory and an explicit drop policy / Make backpressure a deliberate decision, not an accident

An industrial line generates telemetry the way a fire hose generates water. Vision systems emit detection events, PLCs emit cycle counters, sensors emit temperature and vibration samples — and they all arrive on their own schedule, indifferent to whether your database is having a slow moment. The naive pipeline, where ingestion writes straight to storage, works perfectly until the day storage hiccups. Then the hiccup propagates backward, the ingestion goroutine blocks, and you start losing data at the source.

A high-throughput telemetry pipeline in Go is fundamentally an exercise in decoupling. The producer side and the consumer side must run at independent speeds, connected by a bounded buffer that absorbs bursts. When the buffer fills — and it will — you need an explicit, considered policy for what happens next. Drop the oldest sample? Drop the newest? Block? Each is correct for some telemetry and catastrophic for others.

Advertisement

This article builds that pipeline end to end in Go 1.24: a lock-free-ish ring buffer for the hot path, a fan-out of worker goroutines, batched writes downstream, and graceful shutdown that drains in flight data. The pipeline pairs naturally with the edge vision stack from the industrial edge architecture article .

The Shape of the Problem

Three components, three different rate characteristics:

  • Producers — fast, bursty, must never block. A blocked producer is a dropped sensor reading or a stalled camera callback.
  • Buffer — bounded. Unbounded buffers do not solve backpressure, they defer it until an out-of-memory crash.
  • Consumers — slower, variable latency, batch-friendly. Databases and message brokers love batched writes and hate per-event round trips.

The buffer is where the design lives. A plain Go channel is a bounded buffer with blocking semantics, and for many pipelines that is exactly right. But when the requirement is “never block the producer, drop under pressure instead,” you want a ring buffer with an explicit overwrite policy.

A Ring Buffer for the Hot Path

A ring buffer gives you bounded memory, contiguous allocation, and a clear overwrite-oldest semantic. Here is a generic, mutex-guarded implementation that is fast enough for hundreds of thousands of events per second and simple enough to reason about.

// ringbuffer.go
package telemetry

import "sync"

// RingBuffer is a bounded buffer that overwrites the oldest entry when full.
type RingBuffer[T any] struct {
	mu       sync.Mutex
	buf      []T
	head     int   // next write position
	tail     int   // next read position
	size     int   // number of valid entries
	capacity int
	dropped  uint64
}

func NewRingBuffer[T any](capacity int) *RingBuffer[T] {
	if capacity <= 0 {
		panic("ring buffer capacity must be positive")
	}
	return &RingBuffer[T]{
		buf:      make([]T, capacity),
		capacity: capacity,
	}
}

// Push never blocks. If the buffer is full it overwrites the oldest entry
// and increments the dropped counter.
func (r *RingBuffer[T]) Push(v T) {
	r.mu.Lock()
	defer r.mu.Unlock()
	if r.size == r.capacity {
		r.tail = (r.tail + 1) % r.capacity // discard oldest
		r.dropped++
	} else {
		r.size++
	}
	r.buf[r.head] = v
	r.head = (r.head + 1) % r.capacity
}

// PopBatch removes up to n entries into dst and returns the count taken.
func (r *RingBuffer[T]) PopBatch(dst []T) int {
	r.mu.Lock()
	defer r.mu.Unlock()
	n := min(len(dst), r.size)
	for i := 0; i < n; i++ {
		dst[i] = r.buf[r.tail]
		r.tail = (r.tail + 1) % r.capacity
	}
	r.size -= n
	return n
}

func (r *RingBuffer[T]) Dropped() uint64 {
	r.mu.Lock()
	defer r.mu.Unlock()
	return r.dropped
}

The dropped counter is not decoration. It is your single most important signal that the consumer side cannot keep up. Export it as a metric and alert on its rate of change.

The Event Model

Keep the event struct flat and free of pointers where you can. Garbage collection pressure is a real cost at high event rates, and a struct that fits in a cache line and contains no pointers is one the GC barely notices.

// event.go
package telemetry

import "time"

type Kind uint8

const (
	KindSensor Kind = iota
	KindDetection
	KindCycle
)

// Event is intentionally pointer-free for low GC pressure.
type Event struct {
	Timestamp int64   // unix nanoseconds
	SourceID  uint32  // sensor or camera id
	Kind      Kind
	Value     float64
	Sequence  uint64  // per-source monotonic counter for gap detection
}

func NewSensorEvent(src uint32, seq uint64, v float64) Event {
	return Event{
		Timestamp: time.Now().UnixNano(),
		SourceID:  src,
		Kind:      KindSensor,
		Value:     v,
		Sequence:  seq,
	}
}

The per-source Sequence lets consumers detect gaps. If sequence numbers jump, you dropped events somewhere, and you want to know that downstream rather than guess.

The Pipeline

Now assemble it. An ingest stage feeds the ring buffer. A pump goroutine drains the buffer in batches and fans those batches out to a pool of workers over a channel. Workers do the slow work — serialization, network writes — in parallel.

// pipeline.go
package telemetry

import (
	"context"
	"log/slog"
	"sync"
	"time"
)

type Sink interface {
	WriteBatch(ctx context.Context, events []Event) error
}

type Pipeline struct {
	ring      *RingBuffer[Event]
	batchChan chan []Event
	sink      Sink
	workers   int
	batchSize int
	flushIvl  time.Duration
	wg        sync.WaitGroup
	log       *slog.Logger
}

func NewPipeline(sink Sink, log *slog.Logger) *Pipeline {
	return &Pipeline{
		ring:      NewRingBuffer[Event](1 << 16), // 65536 events
		batchChan: make(chan []Event, 64),
		sink:      sink,
		workers:   8,
		batchSize: 512,
		flushIvl:  50 * time.Millisecond,
		log:       log,
	}
}

// Ingest is the producer entry point. It never blocks.
func (p *Pipeline) Ingest(e Event) {
	p.ring.Push(e)
}

// Run starts the pump and workers. It returns when ctx is cancelled and
// all in-flight events have been drained.
func (p *Pipeline) Run(ctx context.Context) {
	for i := 0; i < p.workers; i++ {
		p.wg.Add(1)
		go p.worker(i)
	}
	p.wg.Add(1)
	go p.pump(ctx)
	p.wg.Wait()
}

// pump drains the ring buffer in batches on a ticker and on size triggers.
func (p *Pipeline) pump(ctx context.Context) {
	defer p.wg.Done()
	defer close(p.batchChan)
	ticker := time.NewTicker(p.flushIvl)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			p.drain() // final drain on shutdown
			return
		case <-ticker.C:
			p.drain()
		}
	}
}

func (p *Pipeline) drain() {
	for {
		batch := make([]Event, p.batchSize)
		n := p.ring.PopBatch(batch)
		if n == 0 {
			return
		}
		p.batchChan <- batch[:n]
	}
}

func (p *Pipeline) worker(id int) {
	defer p.wg.Done()
	for batch := range p.batchChan {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		if err := p.writeWithRetry(ctx, batch); err != nil {
			p.log.Error("batch write failed permanently",
				"worker", id, "events", len(batch), "err", err)
		}
		cancel()
	}
}

Retry With Bounded Backoff

A worker that retries forever is a worker that holds a batch hostage while the buffer overflows behind it. Retry a bounded number of times with exponential backoff, then give up and log loudly so the dropped batch is visible.

// retry.go
package telemetry

import (
	"context"
	"errors"
	"time"
)

func (p *Pipeline) writeWithRetry(ctx context.Context, batch []Event) error {
	const maxAttempts = 4
	backoff := 100 * time.Millisecond

	var lastErr error
	for attempt := 1; attempt <= maxAttempts; attempt++ {
		lastErr = p.sink.WriteBatch(ctx, batch)
		if lastErr == nil {
			return nil
		}
		if ctx.Err() != nil {
			return errors.Join(lastErr, ctx.Err())
		}
		p.log.Warn("batch write retry",
			"attempt", attempt, "err", lastErr)
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(backoff):
		}
		backoff *= 2
	}
	return errors.Join(errors.New("batch write exhausted retries"), lastErr)
}

Graceful Shutdown

The shutdown path is where pipelines lose data they should have kept. When the process gets a SIGTERM, you want to stop accepting new events, drain the ring buffer, flush every in-flight batch, and only then exit.

// main.go
package main

import (
	"context"
	"log/slog"
	"os"
	"os/signal"
	"syscall"

	"example.com/telemetry"
)

func main() {
	log := slog.New(slog.NewJSONHandler(os.Stdout, nil))
	sink := telemetry.NewTimescaleSink(os.Getenv("TSDB_DSN"))
	pipe := telemetry.NewPipeline(sink, log)

	ctx, stop := signal.NotifyContext(context.Background(),
		syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	go ingestFromSources(ctx, pipe) // producers call pipe.Ingest

	log.Info("telemetry pipeline started")
	pipe.Run(ctx) // blocks until ctx cancelled and drained
	log.Info("telemetry pipeline drained, exiting")
}

Because pump calls drain one last time after ctx.Done() and Run only returns after wg.Wait(), every event already in the ring buffer reaches a worker before the process exits. The events you genuinely cannot save are the ones still in flight at the producer when the signal arrives, and a short producer-side grace period covers most of those.

Common Pitfalls

  • Unbounded channels or slices. They do not provide backpressure, they just convert it into an out-of-memory crash later.
  • No drop policy. When the buffer fills, the behavior must be a decision you made, not whatever the runtime happens to do.
  • Retrying forever in a worker. One stuck batch blocks a worker, shrinking effective throughput and feeding overflow.
  • Pointer-heavy event structs. At high rates, GC scanning of pointer-laden events becomes a measurable tax.
  • Ignoring the dropped counter. It is the earliest, clearest signal that your consumer side is underprovisioned.

Troubleshooting

  • Symptom: Memory grows without bound until OOM. Cause: An unbounded buffer somewhere — a channel with no cap or a growing slice. Fix: Replace it with a fixed-capacity ring buffer and a defined drop policy.
  • Symptom: Producers stall intermittently. Cause: The producer path touches a blocking operation, often a full channel. Fix: Route producers exclusively through the non-blocking ring buffer Push.
  • Symptom: dropped counter climbs steadily. Cause: Consumer throughput is below ingest rate. Fix: Add workers, increase batch size, or check the sink for a slowdown.
  • Symptom: Data lost on deploy or restart. Cause: Shutdown exits before draining. Fix: Ensure the pump does a final drain after context cancellation and Run waits on the WaitGroup.
  • Symptom: Latency spikes every few seconds. Cause: GC pauses from pointer-heavy events or large per-batch allocations. Fix: Flatten the event struct and pool batch slices with sync.Pool.

What’s Next

A telemetry pipeline that survives production is one where backpressure is explicit, the buffer is bounded, and shutdown drains cleanly. Get those three right and the pipeline absorbs bursts and degrades predictably instead of collapsing. The next step is scaling past a single process — distributing ingestion across nodes with a message broker, which is exactly where processing sensor events at scale with Go and NATS picks up. The Go concurrency reference is at go.dev/doc .

Advertisement