Schema Drift, Handling Source Changes Without Breaking Pipelines
TL;DR — Schema drift = source DDL changes; pipeline misses or breaks. Detect via comparing source schema to expected. Additive changes (new column): auto-extend destination. Breaking (renamed, dropped, type changed): fail loudly, alert, require human action. Never silently drop columns.
After idempotency, the other failure mode that haunts pipelines: schema drift. Source teams change DDL; pipelines either silently drop data or crash.
What drift looks like
Common changes:
- Additive: new column added. Existing pipeline ignores it; data lost.
- Renamed: column
created→created_at. Pipeline fails: no columncreated. - Dropped: column removed. Pipeline fails: no column
customer_legacy_id. - Type changed:
integer→bigint, orvarchar(50)→text. Casts may fail or truncate. - Constraint changed: NOT NULL added retroactively. Backfill might violate.
Each is a different operational scenario. The pipeline should handle each differently.
Detection — compare schemas
Before each run, query the source’s current schema:
def get_source_schema(table: str) -> dict[str, str]:
"""Return {column_name: data_type}."""
rows = pg.execute("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = :t
ORDER BY ordinal_position
""", {"t": table}).fetchall()
return {r[0]: r[1] for r in rows}
Cache the expected schema in a config file or metadata table:
# expected-schemas.yaml
orders:
id: bigint
customer_id: bigint
status: text
total_cents: bigint
created_at: timestamp with time zone
updated_at: timestamp with time zone
Compare:
def diff_schemas(expected, actual):
expected_cols = set(expected.keys())
actual_cols = set(actual.keys())
return {
"added": actual_cols - expected_cols,
"removed": expected_cols - actual_cols,
"type_changed": {
col: (expected[col], actual[col])
for col in expected_cols & actual_cols
if expected[col] != actual[col]
}
}
Returns a delta. Pipeline decides what to do.
Policy per change type
Additive (new column):
Auto-include OR ignore-but-log. Two valid approaches:
if diff["added"]:
log.info(f"new columns: {diff['added']}; including in extract")
# Extract everything, add to destination DDL if needed
OR:
if diff["added"]:
log.warning(f"new columns not in pipeline config: {diff['added']}")
send_alert("Schema drift: new columns; review and update config")
# Continue without new columns
Auto-include is convenient but means destination schema changes silently. I lean toward log + alert; let a human decide.
Removed columns:
if diff["removed"]:
log.error(f"columns removed: {diff['removed']}; cannot continue without action")
send_alert("Schema drift: columns removed; pipeline halted")
sys.exit(1)
Failing loud. Re-run only after human reviews and updates the pipeline’s expected schema.
Type changed:
Variant of removed: log, alert, halt. Type changes are usually safe (varchar to text) but sometimes destructive (bigint to int). Don’t autoassume.
Handling new columns destination-side
For BigQuery / data warehouses, schema is rigid. Adding a column requires ALTER TABLE:
def evolve_destination_schema(table: str, new_columns: dict[str, str]):
for col, type_ in new_columns.items():
bq_type = pg_to_bq_type(type_)
bq.execute(f"ALTER TABLE analytics.{table} ADD COLUMN IF NOT EXISTS {col} {bq_type}")
For Postgres destination:
for col, type_ in new_columns.items():
pg.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS {col} {type_}")
IF NOT EXISTS makes it idempotent.
Do this only if your policy is “auto-include.” Otherwise it’s a manual migration.
Catching breakage in CI
Schema drift detection can run before deployment:
def check_schema():
expected = load_yaml("expected-schemas.yaml")
for table, exp in expected.items():
actual = get_source_schema(table)
diff = diff_schemas(exp, actual)
if diff["removed"] or diff["type_changed"]:
sys.exit(1)
return 0
# In CI: nightly cron runs this against production source DB
Catches drift between scheduled syncs. Aalerts before production pipeline fails.
The schema-as-code pattern
Store expected schemas in version control. Pipeline config changes are PRs reviewable like code:
etl-pipeline/
├── schemas/
│ ├── orders.yaml
│ ├── customers.yaml
│ └── products.yaml
├── pipeline.py
└── README.md
Each PR updating a schema goes through review. Eyeballs see “we’re adding column X.” Optionally auto-generated PRs from schema drift detection.
The “raw column” trick
For pipelines feeding analytical warehouses, some teams keep all source data as a single JSON column plus extracted/typed columns:
CREATE TABLE orders (
id bigint,
status text,
total_cents bigint,
created_at timestamp,
updated_at timestamp,
raw_source jsonb -- everything else
);
New columns silently land in raw_source. Typed columns evolved deliberately. Analysts query both.
For BigQuery, use a RECORD or STRING column with JSON content.
Pros:
- Never lose data on additive changes
- Querying new columns is awkward (
raw_source->>'new_field') — keeps surface area pressure low
Cons:
- Storage cost
- Slightly slower queries
- Some teams hate it on aesthetic grounds
For shops where source teams ship without ETL team awareness, the raw column saves data that would otherwise be lost.
Common Pitfalls
No drift detection. Pipeline silently loses new columns; quarter later, “where did the new feature’s data go?”
Auto-evolution without alerting. Destination schema mutates without anyone noticing; broken downstream queries discovered randomly.
Hard failure on additive changes. Pipeline halts because a column was added; data team gets paged for harmless changes.
Same response to all drift. New column vs dropped column vs type change need different responses.
Expected schema not version controlled. “We thought it was set to X.” Without git history, nobody can prove anything.
Multiple pipelines polling same source, drift detected by only one. Coordinate drift detection at the source-schema level.
Wrapping Up
Detect drift before each run; auto-handle additive; halt on breaking; alert on type changes; commit expected schemas to git. Wednesday: Kafka as sync backbone.