background-shape
Monitoring ETL Pipelines, Lag, Errors, Throughput
October 28, 2022 · 4 min read · by Muhammad Amal programming

TL;DR — Monitor 4 things: freshness lag (now() - last_watermark), success rate, row counts (anomaly detection), throughput. Alert on lag > 1 hour, sustained failures, and row count anomalies. Pushgateway + Prometheus is fine for cron-style pipelines.

After backfilling, the operational closer. ETL without monitoring is data that may or may not exist. This post is what to actually monitor.

Four things to measure

Freshness lag. How stale is the destination data?

time() - etl_last_success_timestamp{table="orders"}

Critical metric. Answers “is the data current?” Most important alert.

Success rate. Fraction of runs that succeeded.

sum(rate(etl_runs_total{status="success",table="orders"}[1h]))
/
sum(rate(etl_runs_total{table="orders"}[1h]))

Throughput. Rows per run, rows per second.

sum by (table) (rate(etl_rows_synced_total[5m]))

Row count anomalies. Sudden drops in volume usually indicate breakage.

sum by (table) (increase(etl_rows_synced_total[24h]))

Compare today vs yesterday vs last week.

Pushgateway for cron pipelines

Cron pipelines run, finish, exit. By the time Prometheus scrapes, the process is gone.

Solution: pushgateway. Pipelines push metrics; Prometheus scrapes the pushgateway:

services:
  pushgateway:
    image: prom/pushgateway:v1.4.3
    ports: ["9091:9091"]

In Prometheus config:

scrape_configs:
  - job_name: pushgateway
    static_configs:
      - targets: ['pushgateway:9091']

Pipeline:

from prometheus_client import CollectorRegistry, Counter, Gauge, push_to_gateway

registry = CollectorRegistry()
rows_synced = Counter('etl_rows_synced_total', 'Rows synced', ['table'], registry=registry)
last_success = Gauge('etl_last_success_timestamp', 'Last success', ['table'], registry=registry)
duration = Gauge('etl_run_duration_seconds', 'Run duration', ['table'], registry=registry)

start = time.time()
try:
    n = sync_orders()
    rows_synced.labels(table='orders').inc(n)
    last_success.labels(table='orders').set_to_current_time()
except Exception as e:
    # Don't set last_success
    raise
finally:
    duration.labels(table='orders').set(time.time() - start)
    push_to_gateway('pushgateway:9091', job='sync_orders', registry=registry)

Successful runs push success timestamp. Failures push the duration but not the timestamp. Alerts on stale timestamp catch failures.

Dashboards

Grafana dashboard per pipeline. Useful panels:

┌──────────────────────────────────────────────────────────────┐
│ Pipeline: orders → BigQuery                                   │
├──────────────────────────────────────────────────────────────┤
│ Freshness lag:     18 min      (target: < 60 min)            │
│ Last success:      18 min ago  (15:42)                       │
│ Last 24h rows:     45,231                                    │
│ vs previous 24h:   +2%                                       │
├──────────────────────────────────────────────────────────────┤
│ [Time series: rows/run over time]                            │
├──────────────────────────────────────────────────────────────┤
│ [Time series: run duration over time]                        │
├──────────────────────────────────────────────────────────────┤
│ Recent runs:                                                 │
│   15:42  success  120 rows  3.2s                             │
│   15:27  success  140 rows  3.1s                             │
│   15:12  failed   -        12s     [view log]                │
└──────────────────────────────────────────────────────────────┘

One screen per pipeline. Operations check daily.

Alerts

Lag alert (most important):

- alert: ETLPipelineStale
  expr: |
    time() - etl_last_success_timestamp > 3600
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Pipeline {{ $labels.table }} hasn't succeeded in 1 hour"

For critical tables: 1-hour threshold. For less critical: 6-12 hours.

Repeated failure alert:

- alert: ETLPipelineFailures
  expr: |
    sum by (table) (increase(etl_runs_total{status="failure"}[1h])) >= 3
  for: 5m
  labels:
    severity: warning

3+ failures in an hour = real problem.

Row count anomaly:

- alert: ETLRowCountAnomaly
  expr: |
    abs(
      increase(etl_rows_synced_total[24h])
      -
      increase(etl_rows_synced_total[24h] offset 7d)
    ) / increase(etl_rows_synced_total[24h] offset 7d) > 0.5
  for: 1h

50% deviation from same-day-last-week. Catches “pipeline broken but watermark still advancing” failures.

Data quality checks beyond pipelines

Pipeline-level monitoring catches “didn’t run.” Data quality monitoring catches “ran but produced bad data.”

Examples:

  • Null rate per column over time
  • Distinct value count
  • Aggregate sums (e.g., daily revenue should never be negative)

For these, tools like dbt’s tests or Great Expectations. Out of scope for “lightweight ETL” but worth knowing for “we need to know our data is right.”

Logging

Structured logs from pipelines, shipped to Loki (covered in Loki post):

import logging
import json

logger = logging.getLogger(__name__)

def sync():
    logger.info(json.dumps({
        "table": "orders",
        "watermark_before": str(wm_before),
        "rows": len(rows),
        "duration_s": duration,
        "status": "success",
    }))

Loki indexes by labels (table, status). Queries:

{job="etl"} | json | table="orders" | status="failure"

Show me all failure logs for the orders pipeline. Useful for incident review.

Common Pitfalls

No monitoring. Pipeline silently broken for a week.

Only counts, no freshness. Pipeline still running but watermark stuck → unnoticed.

Alert on every failure. Single transient failure = noise. Use repeated-failure alert.

Anomaly alerts too tight. Real workload varies; alerts fire daily. Loosen the threshold.

No links in alerts to dashboards / logs. On-call gets paged with no context. Always include links.

Pushgateway as singleton. Goes down = all pipeline metrics gone. Run with HA or accept the gap.

Wrapping Up

Push freshness + counts + duration to Prometheus; alert on stale + repeated failures. Monday: October retro.