background-shape
Debezium for Postgres → Kafka CDC
October 21, 2022 · 4 min read · by Muhammad Amal programming

TL;DR — Debezium reads Postgres logical replication, writes to Kafka. Runs as a Kafka Connect connector. Configure source DB, topic naming, snapshot mode. One JSON config + a Connect REST API call. Handles schema changes, captures deletes, supports outbox pattern.

After Kafka as backbone, the most-used CDC producer: Debezium. It’s how you get Postgres changes into Kafka without writing a custom consumer.

Architecture

[Postgres] → logical replication slot
         [Debezium connector running on Kafka Connect]
         [Kafka topics: serverName.schema.table]

Debezium is a Kafka Connect connector — JVM-based. Reads Postgres WAL via pgoutput plugin, emits Kafka messages.

Deployment

Two parts:

  1. Kafka Connect cluster (one or more workers)
  2. Debezium connector running on Connect

Connect cluster runs as a separate process from Kafka brokers. For most setups, 1-3 Connect workers handle the load.

Compose example:

services:
  kafka:
    image: confluentinc/cp-kafka:7.2.0
    # ...

  connect:
    image: debezium/connect:2.0
    depends_on: [kafka]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
    ports: ["8083:8083"]

Connect exposes REST on 8083 for managing connectors.

Source Postgres config

Recap from logical replication post:

wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

Plus a user with replication:

CREATE USER debezium WITH REPLICATION PASSWORD '...';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

Connector configuration

POST to Connect’s REST API:

{
  "name": "billing-postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "...",
    "database.dbname": "app",
    "topic.prefix": "billing",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.subscriptions,public.invoices,public.payments",
    "snapshot.mode": "initial",
    "decimal.handling.mode": "string",
    "time.precision.mode": "adaptive_time_microseconds"
  }
}

Key config:

  • topic.prefix — Kafka topics get prefixed (e.g., billing.public.subscriptions)
  • plugin.name: pgoutput — built-in Postgres plugin (vs wal2json)
  • table.include.list — which tables to CDC
  • snapshot.mode: initial — snapshot tables first, then stream changes
  • decimal.handling.mode: string — money columns as strings (lossless)

POST it:

curl -X POST -H "Content-Type: application/json" \
  --data @config.json \
  http://connect:8083/connectors

Within seconds, topics start filling.

Message format

Each row change becomes a Kafka message:

{
  "before": null,
  "after": {
    "id": 42,
    "customer_id": 100,
    "status": "active",
    "plan_id": "pro_monthly",
    "started_at": "2022-10-21T09:14:33Z"
  },
  "source": {
    "version": "2.0.0",
    "connector": "postgresql",
    "name": "billing",
    "ts_ms": 1666347273000,
    "snapshot": "false",
    "db": "app",
    "schema": "public",
    "table": "subscriptions",
    "txId": 12345,
    "lsn": 24816088
  },
  "op": "c",
  "ts_ms": 1666347273100
}

op: c = create, u = update, d = delete, r = read (during snapshot).

For UPDATE, before has the old state, after the new. Consumers can compute diffs.

For DELETE, after is null. Useful for deletion propagation.

Snapshot mode

Three useful values:

  • initial — snapshot once, then stream. Default.
  • never — no snapshot; only stream from now. Used when destination already has data.
  • when_needed — snapshot if no offset stored; otherwise stream.

For first-time setup: initial. The snapshot can take hours for large tables; Debezium does it concurrently with normal queries (READ ONLY transaction).

Schema in messages

Debezium emits a schema envelope around every message (controlled by value.converter):

{
  "schema": {
    "type": "struct",
    "fields": [
      {"type": "struct", "name": "Value", "fields": [...]},
      {"type": "string", "name": "Source"},
      {"type": "string", "name": "Op"}
    ]
  },
  "payload": { ... }
}

Schema doubles the message size. For high-volume topics, use Confluent Schema Registry + Avro for compact wire format:

{
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Schemas live in the registry; messages reference by ID; payload is binary Avro.

Outbox pattern

For application-driven events instead of pure CDC:

CREATE TABLE outbox (
  id bigserial PRIMARY KEY,
  aggregate_type text,
  aggregate_id text,
  type text,
  payload jsonb,
  created_at timestamptz DEFAULT now()
);

Application writes to the outbox in the same transaction as the business change. Debezium reads outbox via a special transformer:

{
  "transforms": "outbox",
  "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.route.by.field": "aggregate_type"
}

Each outbox row becomes a Kafka message on the appropriate topic. Clean event-driven architecture without distributed transactions.

Operational concerns

Replication slot lag. Covered in Postgres logical replication post. The biggest CDC pitfall.

Connect cluster scaling. Single Connect worker handles ~10K msg/sec. Past that, scale out workers.

Connector restart. Debezium handles connector restart gracefully (resumes from last committed offset). DB restart: snapshot mode = never means it’ll resume from WAL. Watch for slot getting recreated → re-snapshots from scratch.

Schema changes. Debezium handles ALTER TABLE additive changes automatically. Destructive (DROP COLUMN) requires care.

Common Pitfalls

No replication slot monitoring. Connector crashes; slot retains WAL forever; disk fills.

snapshot.mode: never without an existing snapshot. Destination misses historical rows.

Schema registry costs. Self-hosted is fine; managed schema registries can be expensive at scale.

Outbox pattern without cleanup. Outbox table grows forever. Periodic delete of processed rows.

Topic compaction on CDC topics. Compaction deletes old changes; you lose the change history. Use compaction only for snapshot-style topics.

Decimal columns without string mode. Default mode loses precision. Set decimal.handling.mode: string.

Wrapping Up

Debezium → Kafka = standard Postgres CDC stack. One config + REST POST = production CDC. Monday: Postgres COPY for bulk loads.