background-shape
Building an ETL Pipeline in Python
October 10, 2022 · 4 min read · by Muhammad Amal programming

TL;DR — Python ETL: SQLAlchemy for source DB, Polars (faster than pandas) for transforms, native client for destination. Pattern: read incremental, transform in DataFrame, write batched. Watermark in a metadata table. ~200 lines for a real pipeline.

After Postgres logical replication, the actual code. Python is the default ETL language; the libraries are mature.

Why Python for ETL

  • Fast iteration; easy to debug
  • pandas / Polars for transforms
  • Native clients for every data platform (BigQuery, Snowflake, S3, Postgres)
  • No JVM to operate

The downsides (GIL, slower than Go for CPU-bound work) rarely matter for ETL — most time is in IO.

A complete pipeline

Sync orders from Postgres to BigQuery:

import os
from datetime import datetime, timezone

import polars as pl
import sqlalchemy
from google.cloud import bigquery

# Connections
pg = sqlalchemy.create_engine(
    os.environ['DATABASE_URL'],
    pool_pre_ping=True,
)
bq = bigquery.Client()

# Watermark in a metadata table
def get_watermark(table: str) -> datetime:
    with pg.connect() as conn:
        row = conn.execute(sqlalchemy.text(
            "SELECT high_watermark FROM etl_state WHERE table_name = :t"
        ), {"t": table}).fetchone()
        if row and row[0]:
            return row[0]
        return datetime(2020, 1, 1, tzinfo=timezone.utc)

def set_watermark(table: str, watermark: datetime):
    with pg.begin() as conn:
        conn.execute(sqlalchemy.text("""
            INSERT INTO etl_state (table_name, high_watermark, updated_at)
            VALUES (:t, :wm, NOW())
            ON CONFLICT (table_name) DO UPDATE
            SET high_watermark = EXCLUDED.high_watermark, updated_at = NOW()
        """), {"t": table, "wm": watermark})

# Extract
def extract_orders(since: datetime) -> pl.DataFrame:
    with pg.connect() as conn:
        df = pl.read_database(
            query="""
                SELECT id, customer_id, status, total_cents, created_at, updated_at
                FROM orders
                WHERE updated_at > %(since)s
                ORDER BY updated_at
                LIMIT 50000
            """,
            connection=conn,
            params={"since": since},
        )
    return df

# Transform
def transform_orders(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns([
        (pl.col("total_cents") / 100).alias("total_dollars"),
        pl.col("status").str.to_lowercase().alias("status"),
        pl.col("created_at").dt.date().alias("created_date"),
    ])

# Load
def load_orders(df: pl.DataFrame):
    if df.is_empty():
        return
    pandas_df = df.to_pandas()
    table_ref = bq.dataset("analytics").table("orders")
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND",
        time_partitioning=bigquery.TimePartitioning(field="created_date"),
    )
    job = bq.load_table_from_dataframe(pandas_df, table_ref, job_config=job_config)
    job.result()

# Main
def sync():
    watermark = get_watermark("orders")
    df = extract_orders(watermark)
    if df.is_empty():
        print("no new rows")
        return

    df = transform_orders(df)
    load_orders(df)
    new_watermark = df["updated_at"].max()
    set_watermark("orders", new_watermark)
    print(f"synced {df.height} rows, new watermark: {new_watermark}")

if __name__ == "__main__":
    sync()

130 lines. Real production code. Cron entry: */15 * * * * python /opt/sync/orders.py.

Why Polars over pandas

Polars (Rust-backed) is 5-10× faster than pandas for typical ETL operations. API is similar; learning curve small.

  • Lazy evaluation (operations build a plan; executed when materialized)
  • Multi-threaded by default
  • Better memory model (Arrow-based)
  • More predictable types (no silent NaN-ing of integers)

For 100MB datasets, pandas is fine. For 1GB+, Polars saves real time.

Connection pooling

The pool_pre_ping=True matters. Without it, connections that idled out (network glitch, DB restart) fail mysteriously on first use.

For high-frequency pipelines, also tune:

pg = sqlalchemy.create_engine(
    DATABASE_URL,
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=3600,
)

Error handling pattern

import logging
import sys

logger = logging.getLogger(__name__)

def sync_with_retry(max_attempts=3):
    for attempt in range(max_attempts):
        try:
            sync()
            return
        except Exception as e:
            if attempt < max_attempts - 1:
                logger.warning(f"attempt {attempt+1} failed: {e}, retrying")
                time.sleep(2 ** attempt)
            else:
                logger.error(f"all attempts failed: {e}")
                # Send to Slack, exit non-zero for monitoring
                send_alert(f"orders sync failed: {e}")
                sys.exit(1)

if __name__ == "__main__":
    sync_with_retry()

Exit non-zero on failure. Monitoring (next post in the month) picks up the non-zero exit.

Batching for big tables

For tables with millions of rows on a backfill:

def sync_chunked(table: str, batch_size: int = 50000):
    watermark = get_watermark(table)
    while True:
        df = extract_with_limit(watermark, batch_size)
        if df.is_empty():
            break
        df = transform(df)
        load(df)
        new_wm = df["updated_at"].max()
        set_watermark(table, new_wm)
        watermark = new_wm
        if df.height < batch_size:
            break  # last batch

Loop until done. Watermark advances; on crash, resume from last committed watermark. Idempotent.

Schema validation

Polars catches type mismatches; missing columns are a different issue. Validate explicitly:

REQUIRED_COLUMNS = {"id", "customer_id", "status", "total_cents", "updated_at"}

def validate(df: pl.DataFrame):
    actual = set(df.columns)
    missing = REQUIRED_COLUMNS - actual
    if missing:
        raise ValueError(f"missing columns: {missing}")

Fail fast if the source DDL changed. Covered Oct 17 in the schema drift post.

Monitoring

The pipeline should emit metrics:

from prometheus_client import Counter, Gauge, push_to_gateway

rows_synced = Counter('etl_rows_synced_total', 'Rows synced', ['table'])
sync_duration = Gauge('etl_sync_duration_seconds', 'Sync duration', ['table'])
last_success_ts = Gauge('etl_last_success_timestamp', 'Last successful sync', ['table'])

def sync():
    start = time.time()
    # ... do work
    rows_synced.labels(table='orders').inc(df.height)
    sync_duration.labels(table='orders').set(time.time() - start)
    last_success_ts.labels(table='orders').set_to_current_time()
    push_to_gateway('pushgateway:9091', job='orders_sync', registry=...)

Prometheus scrapes the Pushgateway; dashboards show pipeline health. Alert on time() - last_success_ts > 3600.

Common Pitfalls

No watermark. Re-fetches everything each run. Doesn’t scale.

Connection without pool_pre_ping. Mysterious failures after idle period.

pandas with 10M rows. OOMs. Use Polars or batch.

Silent column drops. Source DDL adds new column; pipeline drops it because no validation. Add explicit allowed-columns check.

Watermark advanced before successful load. Crash leaves data not loaded but watermark moved. Move watermark in same transaction as load if possible, otherwise after successful load.

Loading directly without staging. Big bulk loads + queries on the destination at the same time → contention. Stage to a temp table, then swap.

Wrapping Up

Python + Polars + SQLAlchemy + native client = a real ETL pipeline in ~200 lines. Wednesday: the Go variant.