background-shape
Schema Drift, Handling Source Changes Without Breaking Pipelines
October 17, 2022 · 4 min read · by Muhammad Amal programming

TL;DR — Schema drift = source DDL changes; pipeline misses or breaks. Detect via comparing source schema to expected. Additive changes (new column): auto-extend destination. Breaking (renamed, dropped, type changed): fail loudly, alert, require human action. Never silently drop columns.

After idempotency, the other failure mode that haunts pipelines: schema drift. Source teams change DDL; pipelines either silently drop data or crash.

What drift looks like

Common changes:

  • Additive: new column added. Existing pipeline ignores it; data lost.
  • Renamed: column createdcreated_at. Pipeline fails: no column created.
  • Dropped: column removed. Pipeline fails: no column customer_legacy_id.
  • Type changed: integerbigint, or varchar(50)text. Casts may fail or truncate.
  • Constraint changed: NOT NULL added retroactively. Backfill might violate.

Each is a different operational scenario. The pipeline should handle each differently.

Detection — compare schemas

Before each run, query the source’s current schema:

def get_source_schema(table: str) -> dict[str, str]:
    """Return {column_name: data_type}."""
    rows = pg.execute("""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_schema = 'public' AND table_name = :t
        ORDER BY ordinal_position
    """, {"t": table}).fetchall()
    return {r[0]: r[1] for r in rows}

Cache the expected schema in a config file or metadata table:

# expected-schemas.yaml
orders:
  id: bigint
  customer_id: bigint
  status: text
  total_cents: bigint
  created_at: timestamp with time zone
  updated_at: timestamp with time zone

Compare:

def diff_schemas(expected, actual):
    expected_cols = set(expected.keys())
    actual_cols = set(actual.keys())
    return {
        "added":   actual_cols - expected_cols,
        "removed": expected_cols - actual_cols,
        "type_changed": {
            col: (expected[col], actual[col])
            for col in expected_cols & actual_cols
            if expected[col] != actual[col]
        }
    }

Returns a delta. Pipeline decides what to do.

Policy per change type

Additive (new column):

Auto-include OR ignore-but-log. Two valid approaches:

if diff["added"]:
    log.info(f"new columns: {diff['added']}; including in extract")
    # Extract everything, add to destination DDL if needed

OR:

if diff["added"]:
    log.warning(f"new columns not in pipeline config: {diff['added']}")
    send_alert("Schema drift: new columns; review and update config")
    # Continue without new columns

Auto-include is convenient but means destination schema changes silently. I lean toward log + alert; let a human decide.

Removed columns:

if diff["removed"]:
    log.error(f"columns removed: {diff['removed']}; cannot continue without action")
    send_alert("Schema drift: columns removed; pipeline halted")
    sys.exit(1)

Failing loud. Re-run only after human reviews and updates the pipeline’s expected schema.

Type changed:

Variant of removed: log, alert, halt. Type changes are usually safe (varchar to text) but sometimes destructive (bigint to int). Don’t autoassume.

Handling new columns destination-side

For BigQuery / data warehouses, schema is rigid. Adding a column requires ALTER TABLE:

def evolve_destination_schema(table: str, new_columns: dict[str, str]):
    for col, type_ in new_columns.items():
        bq_type = pg_to_bq_type(type_)
        bq.execute(f"ALTER TABLE analytics.{table} ADD COLUMN IF NOT EXISTS {col} {bq_type}")

For Postgres destination:

for col, type_ in new_columns.items():
    pg.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col} {type_}")

IF NOT EXISTS makes it idempotent.

Do this only if your policy is “auto-include.” Otherwise it’s a manual migration.

Catching breakage in CI

Schema drift detection can run before deployment:

def check_schema():
    expected = load_yaml("expected-schemas.yaml")
    for table, exp in expected.items():
        actual = get_source_schema(table)
        diff = diff_schemas(exp, actual)
        if diff["removed"] or diff["type_changed"]:
            sys.exit(1)
    return 0

# In CI: nightly cron runs this against production source DB

Catches drift between scheduled syncs. Aalerts before production pipeline fails.

The schema-as-code pattern

Store expected schemas in version control. Pipeline config changes are PRs reviewable like code:

etl-pipeline/
├── schemas/
│   ├── orders.yaml
│   ├── customers.yaml
│   └── products.yaml
├── pipeline.py
└── README.md

Each PR updating a schema goes through review. Eyeballs see “we’re adding column X.” Optionally auto-generated PRs from schema drift detection.

The “raw column” trick

For pipelines feeding analytical warehouses, some teams keep all source data as a single JSON column plus extracted/typed columns:

CREATE TABLE orders (
  id bigint,
  status text,
  total_cents bigint,
  created_at timestamp,
  updated_at timestamp,
  raw_source jsonb       -- everything else
);

New columns silently land in raw_source. Typed columns evolved deliberately. Analysts query both.

For BigQuery, use a RECORD or STRING column with JSON content.

Pros:

  • Never lose data on additive changes
  • Querying new columns is awkward (raw_source->>'new_field') — keeps surface area pressure low

Cons:

  • Storage cost
  • Slightly slower queries
  • Some teams hate it on aesthetic grounds

For shops where source teams ship without ETL team awareness, the raw column saves data that would otherwise be lost.

Common Pitfalls

No drift detection. Pipeline silently loses new columns; quarter later, “where did the new feature’s data go?”

Auto-evolution without alerting. Destination schema mutates without anyone noticing; broken downstream queries discovered randomly.

Hard failure on additive changes. Pipeline halts because a column was added; data team gets paged for harmless changes.

Same response to all drift. New column vs dropped column vs type change need different responses.

Expected schema not version controlled. “We thought it was set to X.” Without git history, nobody can prove anything.

Multiple pipelines polling same source, drift detected by only one. Coordinate drift detection at the source-schema level.

Wrapping Up

Detect drift before each run; auto-handle additive; halt on breaking; alert on type changes; commit expected schemas to git. Wednesday: Kafka as sync backbone.