background-shape
Building an ETL Pipeline in Go
October 12, 2022 · 5 min read · by Muhammad Amal programming

TL;DR — Go ETL: pgx for Postgres, COPY for bulk inserts, channels for concurrent stages. Faster CPU-bound transforms than Python, harder for complex data shapes. Reach for Go when throughput matters more than transform flexibility.

After the Python version, Go for the same job. Different ergonomics; different strengths.

When Go wins

Three scenarios where Go beats Python for ETL:

High throughput, simple transforms. Streaming 10K rows/sec through filter/map operations. Go’s CPU is faster + no GIL.

Long-running consumer pipelines. Kafka consumer that runs for weeks, processing millions of events. Memory profile is more predictable.

Embedded ETL. A 5MB binary running on edge devices doing local transforms before shipping. Python’s runtime size is awkward there.

For analytical workloads (pivot, aggregate, join in DataFrame style): stick with Python + Polars.

The pipeline

Same job as the Python post — sync orders from Postgres to BigQuery:

package main

import (
    "context"
    "log"
    "os"
    "time"

    "cloud.google.com/go/bigquery"
    "github.com/jackc/pgx/v4/pgxpool"
)

type Order struct {
    ID          int64     `bigquery:"id"`
    CustomerID  int64     `bigquery:"customer_id"`
    Status      string    `bigquery:"status"`
    TotalCents  int64     `bigquery:"total_cents"`
    TotalDollar float64   `bigquery:"total_dollars"`
    CreatedAt   time.Time `bigquery:"created_at"`
    UpdatedAt   time.Time `bigquery:"updated_at"`
}

type Pipeline struct {
    pg *pgxpool.Pool
    bq *bigquery.Client
}

func (p *Pipeline) GetWatermark(ctx context.Context, table string) (time.Time, error) {
    var wm time.Time
    err := p.pg.QueryRow(ctx,
        `SELECT high_watermark FROM etl_state WHERE table_name = $1`,
        table).Scan(&wm)
    if err == pgx.ErrNoRows {
        return time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), nil
    }
    return wm, err
}

func (p *Pipeline) SetWatermark(ctx context.Context, table string, wm time.Time) error {
    _, err := p.pg.Exec(ctx, `
        INSERT INTO etl_state (table_name, high_watermark, updated_at)
        VALUES ($1, $2, NOW())
        ON CONFLICT (table_name) DO UPDATE
        SET high_watermark = EXCLUDED.high_watermark, updated_at = NOW()
    `, table, wm)
    return err
}

func (p *Pipeline) Extract(ctx context.Context, since time.Time) ([]Order, error) {
    rows, err := p.pg.Query(ctx, `
        SELECT id, customer_id, status, total_cents, created_at, updated_at
        FROM orders WHERE updated_at > $1
        ORDER BY updated_at LIMIT 50000
    `, since)
    if err != nil { return nil, err }
    defer rows.Close()

    var orders []Order
    for rows.Next() {
        var o Order
        if err := rows.Scan(&o.ID, &o.CustomerID, &o.Status,
            &o.TotalCents, &o.CreatedAt, &o.UpdatedAt); err != nil {
            return nil, err
        }
        // Transform inline
        o.TotalDollar = float64(o.TotalCents) / 100.0
        o.Status = strings.ToLower(o.Status)
        orders = append(orders, o)
    }
    return orders, rows.Err()
}

func (p *Pipeline) Load(ctx context.Context, orders []Order) error {
    if len(orders) == 0 { return nil }
    inserter := p.bq.Dataset("analytics").Table("orders").Inserter()
    return inserter.Put(ctx, orders)
}

func (p *Pipeline) Sync(ctx context.Context) error {
    wm, err := p.GetWatermark(ctx, "orders")
    if err != nil { return err }

    orders, err := p.Extract(ctx, wm)
    if err != nil { return err }
    if len(orders) == 0 {
        log.Println("no new rows")
        return nil
    }

    if err := p.Load(ctx, orders); err != nil { return err }

    newWM := orders[len(orders)-1].UpdatedAt
    if err := p.SetWatermark(ctx, "orders", newWM); err != nil { return err }

    log.Printf("synced %d rows, new watermark: %v", len(orders), newWM)
    return nil
}

func main() {
    ctx := context.Background()
    pg, _ := pgxpool.Connect(ctx, os.Getenv("DATABASE_URL"))
    bq, _ := bigquery.NewClient(ctx, os.Getenv("GCP_PROJECT"))
    p := &Pipeline{pg: pg, bq: bq}
    if err := p.Sync(ctx); err != nil {
        log.Fatal(err)
    }
}

~120 lines. Compiles to a ~15 MB binary. Same shape as Python.

Concurrent stages with channels

For high-throughput pipelines, run extract / transform / load concurrently:

func (p *Pipeline) ConcurrentSync(ctx context.Context) error {
    extractCh := make(chan Order, 1000)
    transformCh := make(chan Order, 1000)

    // Extractor
    go func() {
        defer close(extractCh)
        wm, _ := p.GetWatermark(ctx, "orders")
        rows, _ := p.pg.Query(ctx, "SELECT ...", wm)
        defer rows.Close()
        for rows.Next() {
            var o Order
            rows.Scan(&o.ID, &o.CustomerID, ...)
            extractCh <- o
        }
    }()

    // Transformer
    go func() {
        defer close(transformCh)
        for o := range extractCh {
            o.TotalDollar = float64(o.TotalCents) / 100
            o.Status = strings.ToLower(o.Status)
            transformCh <- o
        }
    }()

    // Loader
    batch := make([]Order, 0, 1000)
    for o := range transformCh {
        batch = append(batch, o)
        if len(batch) >= 1000 {
            p.bq.Dataset("analytics").Table("orders").Inserter().Put(ctx, batch)
            batch = batch[:0]
        }
    }
    if len(batch) > 0 {
        p.bq.Dataset("analytics").Table("orders").Inserter().Put(ctx, batch)
    }
    return nil
}

Extraction, transformation, and loading run concurrently. For simple pipelines this is overkill; for high-throughput it doubles or triples throughput.

COPY for Postgres bulk loads

If destination is Postgres (instead of BigQuery), use COPY for big inserts:

func (p *Pipeline) BulkInsert(ctx context.Context, orders []Order) error {
    rows := make([][]any, len(orders))
    for i, o := range orders {
        rows[i] = []any{o.ID, o.CustomerID, o.Status, o.TotalCents, o.CreatedAt, o.UpdatedAt}
    }
    _, err := p.pg.CopyFrom(ctx,
        []string{"orders"},
        []string{"id", "customer_id", "status", "total_cents", "created_at", "updated_at"},
        pgx.CopyFromRows(rows),
    )
    return err
}

COPY is 10-100× faster than INSERT for bulk loads. Default for Postgres-to-Postgres.

Covered separately in the bulk loading post.

Error handling

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer cancel()

    p := &Pipeline{...}

    for attempt := 1; attempt <= 3; attempt++ {
        err := p.Sync(ctx)
        if err == nil {
            return
        }
        log.Printf("attempt %d failed: %v", attempt, err)
        if attempt < 3 {
            time.Sleep(time.Duration(1<<attempt) * time.Second)
        }
    }
    log.Fatal("all attempts failed")
}

Exit non-zero on failure for cron / monitoring to pick up.

Resource footprint

For the same workload (1M rows/hour sync):

Language RSS CPU Binary size
Python ~250 MB ~80% of 1 core ~10 MB Python + deps
Go ~50 MB ~20% of 1 core 15 MB binary

Go uses less; Python is more flexible.

When NOT to use Go for ETL

  • Complex pivots, joins, aggregations across DataFrames → Polars/pandas
  • Data scientist team members will edit → Python (familiarity)
  • Quick prototypes → Python (faster iteration)
  • Pipeline does most of its work in third-party libs (sklearn, etc.) → Python

For boring data movement: Go. For interesting data transformation: Python.

Common Pitfalls

database/sql instead of pgx native. Slower scanning. Use pgx directly.

Forgetting rows.Close(). Connection leak. Always defer.

No timeout / context. Long queries hold connections forever. Always context-bound.

Mutating slice during range. Index out of bounds. Use index loops if mutating.

Single goroutine for everything. Extraction blocks transform blocks load. Channels enable concurrency.

BigQuery client default settings. Insufficient retry, no idempotency. Tune for your throughput.

Wrapping Up

Go ETL: faster, smaller, less flexible. Pick by team / workload. Friday: idempotent pipelines and watermarks — the discipline that prevents data duplication.