Idempotent Pipelines and Watermarks
TL;DR — Idempotent pipelines: each run produces the same destination state, regardless of how many times it runs. Watermark tables track progress. Upsert (not append) on the destination. The watermark advances only after successful load. Three patterns cover ~all cases.
After Go ETL, the discipline question. Pipelines crash. Re-runs happen. Without idempotency, you get duplicate rows; with it, you don’t notice.
The problem
Naive pipeline:
# Extract rows updated since 2 hours ago
rows = pg.query("SELECT * FROM orders WHERE updated_at > '2h ago'")
# Append to destination
bq.append("orders", rows)
Run twice (cron fires twice, or you re-run after a crash). Destination has each row twice.
Real production data integrity issues stem from this pattern. Most teams discover it months in via duplicate-count reports.
The watermark table
A small Postgres table tracks “what’s the high-water mark per source table”:
CREATE TABLE etl_state (
table_name text PRIMARY KEY,
high_watermark timestamptz NOT NULL,
rows_synced bigint DEFAULT 0,
updated_at timestamptz DEFAULT now()
);
Pipeline:
def sync(table):
wm = get_watermark(table)
rows = extract_since(wm)
if not rows:
return
load(rows)
new_wm = max(r['updated_at'] for r in rows)
set_watermark(table, new_wm)
Watermark only advances after load succeeds. Crash between load and watermark update = next run re-loads those rows. Need destination idempotency (next).
Destination idempotency — three patterns
Pattern A: Upsert / MERGE.
INSERT INTO orders (id, status, updated_at, ...)
VALUES ($1, $2, $3, ...)
ON CONFLICT (id) DO UPDATE SET
status = EXCLUDED.status,
updated_at = EXCLUDED.updated_at,
...
Each row uniquely keyed; re-load updates the existing row. Multiple loads of the same row = same final state.
In BigQuery:
MERGE INTO target USING source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
Works for any destination with upsert semantics.
Pattern B: Append + dedupe view.
For destinations where upsert is expensive (some warehouses), append all loads to a “raw” table, then provide a view that deduplicates:
CREATE OR REPLACE VIEW orders AS
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY loaded_at DESC) AS rn
FROM orders_raw
)
WHERE rn = 1;
Append-only writes are fast. Reads through the view get deduplicated. Periodically vacuum old rows.
Common in BigQuery / Snowflake; fits their pricing model.
Pattern C: Delete-then-insert per batch.
def load_batch(rows):
ids = [r['id'] for r in rows]
with pg.begin() as tx:
tx.execute("DELETE FROM orders WHERE id = ANY(:ids)", {"ids": ids})
tx.execute_many("INSERT INTO orders (...) VALUES (...)", rows)
In a single transaction. Re-running with the same batch wipes + reinserts; net effect is the same.
Works well for moderate batch sizes; gets expensive for huge ones.
Watermark advance ordering
Two strategies:
Advance AFTER load succeeds. What we showed above. Crash before watermark update = next run reloads. Combined with upsert, safe.
Advance IN the same transaction as load. When destination + watermark are in the same DB:
BEGIN;
INSERT INTO orders ... ON CONFLICT DO UPDATE ...;
UPDATE etl_state SET high_watermark = $1 WHERE table_name = 'orders';
COMMIT;
Atomic. No risk of inconsistency between load and watermark.
Only works for same-DB sync. For Postgres → BigQuery, fall back to “advance after load.”
Multi-watermark scenarios
For pipelines that pull from multiple sources or write to multiple destinations:
CREATE TABLE etl_state (
pipeline_name text NOT NULL,
table_name text NOT NULL,
high_watermark timestamptz NOT NULL,
PRIMARY KEY (pipeline_name, table_name)
);
Each pipeline maintains its own watermark per source table. Pipelines independent; one pipeline’s failure doesn’t affect others.
Late-arriving data
Some data arrives delayed. Naïve “watermark on updated_at” misses rows where:
- Row updated at time T
- Pipeline runs at T+5min, watermark advances to T
- Row’s updated_at was actually T+2min (clock skew or batched updates)
Strategies:
Subtract a buffer. Pipeline queries updated_at > watermark - 10min. Re-fetches recent rows; upsert handles dedup. Trades cost for safety.
Use sequence-based watermark. Postgres’ transaction IDs (or sequences) are monotonic; updated_at isn’t always. SELECT * WHERE xmin > <last_xmin> is hardier.
CDC instead. CDC catches every change in order; no clock-skew issue.
“Exactly once” — partial myth
In distributed systems, true exactly-once is impossible. What you get:
- At-most-once: maybe missed. Bad for ETL.
- At-least-once: maybe duplicated. With idempotency, indistinguishable from exactly-once.
- Exactly-once semantics: at-least-once + idempotent processing.
ETL pattern: at-least-once delivery + idempotent destination = effectively exactly-once. Don’t chase strict exactly-once protocols; just be idempotent.
Testing idempotency
def test_orders_pipeline_idempotent():
seed_orders(10)
sync()
count1 = bq.row_count("orders")
# Re-run without source changes
sync()
count2 = bq.row_count("orders")
assert count1 == count2, "second run should not add rows"
Add this test. It catches the pattern bugs early. Run on every CI build.
Common Pitfalls
Append-only without dedup view. Duplicates accumulate. Use upsert or view-based dedup.
Watermark advanced before load. Crash leaves data not loaded; next run skips it.
Watermark stored in destination as a row but not transactional. Update can succeed while load fails. Use the metadata table in the source DB or same-DB transactions.
Multiple workers updating same watermark. Race; one overwrites the other’s progress. Use a lock or assign tables to workers.
No LIMIT on extract. Catastrophic re-run after long downtime: pulls 100M rows, OOMs. Always limit.
Re-loading the same batch when source didn’t change. Wastes resources. Skip if extract returns empty.
Wrapping Up
Watermark table + upsert destination + advance-after-load = idempotent pipelines. Tested by re-running with no source changes. Monday: schema drift handling.