Publish PostgreSQL Data Changes as Kafka Events using Debezium Connector

Debezium Connector is used to capture row-level changes and publish them as Events to your Kafka Cluster without the need to write custom code.

If you have a requirement for (change data capture) in an Event-Driven Architecture, then consider Debezium.

Debezium support JSON or you can use a custom converter, for Avro Serialization.

The Debezium PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. For information about the PostgreSQL versions that are compatible with the connector, see the Debezium release overview.

The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

https://debezium.io/documentation/reference/stable/connectors/postgresql.html

The Architecture of Debezium is described here.


We will use Docker to setup all the required components locally for this but in a real environment, you could do deploy on a Kubernetes/OpenShift or if you want to run it Embedded within your application but it will not have the level of fault tolerance/reliability as running combined with a Kafka Cluster.

Setup:

Docker Containers:

Create a docker-compose.yml file with the below contents:

version: "3"

services:
  zookeeper:
    container_name: zookeeper
    image: docker.io/bitnami/zookeeper:3.8
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    container_name: kafka
    image: docker.io/bitnami/kafka:3.3
    ports:
      - "9092:9092"
      - "9094:9094"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL      
    depends_on:
      - zookeeper
  connect:
    container_name: connect
    image: quay.io/debezium/connect:2.1
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - kafka
  postgres:
    container_name: postgres
    image: postgres
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"      
    
volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local


run docker-compose up -d

Docker will create a default network. The network name will be the directory you ran docker-compose from. The common network will allow all 3 containers to communicate with each other.

If you run each container separately, then you will need to create a network and connect all containers (docker –link them).

docker-compose up -d

Creating network "kafka_demo_default" with the default driver
Creating volume "kafka_demo_zookeeper_data" with local driver
Creating volume "kafka_demo_kafka_data" with local driver
Creating postgres  ... done
Creating zookeeper ... done
Creating kafka     ... done
Creating connect   ... done

Verify that everything is up and green on Docker Windows.

The containers installed are (Kafka, ZooKeeper, Debezium-connector, and a PostgreSQL)


Database:

Create a Schema and a Table in the PostgreSQL Database running the docker container:

Schema: outbox
Table: mytable
Columns:
id int (Primary Key)
message varchar

Insert some sample records into this table:

INSERT INTO outbox.mytable
(id, message)
VALUES(8, 'hello world9');


Debezium Connector REST:

Use the Debezium REST API to register a connector and specify that we need Debezium to monitor mytable and send Events to a Topic.

Lets do a GET first to see if we have any registered connectors:

GET localhost:8083/connectors

No connectors registered, so we should register a connector:

POST localhost:8083/connectors

The JSON Payload is below. You should use POST and if successful you will receive status 201 Created

{
  "name": "mytable-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",  
    "plugin.name": "pgoutput",  
    "database.hostname": "postgres",  
    "database.dbname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "schema.include.list": "outbox",
    "table.include.list": "mytable",
    "signal.data.collection": "outbox.mytable",
    "topic.prefix": "mytopic.prefix",  
    "database.include.list": "postgres",  
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092"  
  }
}
  • plugin.name: pgoutput –> pgoutput is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication.

    This plug-in is always present so no additional libraries need to be installed.
    The Debezium connector interprets the raw replication event stream directly into change events.
  • schema.include.list –> The schema you want to monitor
  • table.include.list –> The table inside your schema that you want to monitor
  • signal.data.collection –> Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name: <schemaName>.<tableName>
  • topic.prefix –> The prefix to the topic that will be created by the Connector.

Debezium Connector Log:

After registering the mytable-connector, Debezium will take a snapshot of the data that already exists in the table.

connect | 2023-01-29 09:57:40,637 INFO   Postgres|outbox.mytable.events|postgres-connector-task  Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/1543970}, catalogXmin=737]   [io.debezium.connector.postgresql.connection.PostgresConnection]
connect | 2023-01-29 09:57:40,638 INFO   Postgres|outbox.mytable.events|postgres-connector-task  No previous offset found   [io.debezium.connector.postgresql.PostgresConnectorTask]
connect | 2023-01-29 09:57:40,638 INFO   Postgres|outbox.mytable.events|postgres-connector-task  Requested thread factory for connector PostgresConnector, id = outbox.mytable.events named = change-event-source-coordinator   [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,639 INFO   Postgres|outbox.mytable.events|postgres-connector-task  Creating thread debezium-postgresconnector-outbox.mytable.events-change-event-source-coordinator   [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,639 INFO   ||  WorkerSourceTask{id=mytable-connector-0} Source task finished initialization and start   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
connect | 2023-01-29 09:57:40,640 INFO   Postgres|outbox.mytable.events|snapshot  Metrics registered   [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,641 INFO   Postgres|outbox.mytable.events|snapshot  Context created   [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,641 INFO   Postgres|outbox.mytable.events|snapshot  Taking initial snapshot for new datasource   [io.debezium.connector.postgresql.snapshot.InitialSnapshotter]
connect | 2023-01-29 09:57:40,641 INFO   Postgres|outbox.mytable.events|snapshot  According to the connector configuration data will be snapshotted   [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,641 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 1 - Preparing   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,668 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 2 - Determining captured tables   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,670 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 3 - Locking captured tables []   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,670 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 4 - Determining snapshot offset   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,670 INFO   Postgres|outbox.mytable.events|snapshot  Creating initial offset context   [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,671 INFO   Postgres|outbox.mytable.events|snapshot  Read xlogStart at 'LSN{0/1543AB8}' from transaction '738'   [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,672 INFO   Postgres|outbox.mytable.events|snapshot  Read xlogStart at 'LSN{0/1543AB8}' from transaction '738'   [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,672 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 5 - Reading structure of captured tables   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,686 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 6 - Persisting schema history   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,686 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot step 7 - Snapshotting data   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO   Postgres|outbox.mytable.events|snapshot  Snapshotting contents of 0 tables while still in transaction   [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot - Final stage   [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot completed   [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO   Postgres|outbox.mytable.events|snapshot  Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='outbox.mytable.events'db='postgres', lsn=LSN{0/1543AB8}, txId=738, timestamp=2023-01-29T09:57:40.672Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]   [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,687 WARN   Postgres|outbox.mytable.events|snapshot  After applying the include/exclude list filters, no changes will be captured. Please check your configuration!   [io.debezium.relational.RelationalDatabaseSchema]
connect | 2023-01-29 09:57:40,687 INFO   Postgres|outbox.mytable.events|streaming  Connected metrics set to 'true'   [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,702 INFO   Postgres|outbox.mytable.events|streaming  No incremental snapshot in progress, no action needed on start   [io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,702 INFO   Postgres|outbox.mytable.events|streaming  Starting streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,703 INFO   Postgres|outbox.mytable.events|streaming  Retrieved latest position from stored offset 'LSN{0/1543AB8}'   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
connect | 2023-01-29 09:57:40,703 INFO   Postgres|outbox.mytable.events|streaming  Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/1543AB8}'   [io.debezium.connector.postgresql.connection.WalPositionLocator]
connect | 2023-01-29 09:57:40,703 INFO   Postgres|outbox.mytable.events|streaming  Initializing PgOutput logical decoder publication   [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
connect | 2023-01-29 09:57:40,719 INFO   Postgres|outbox.mytable.events|streaming  Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/1543970}, catalogXmin=737]   [io.debezium.connector.postgresql.connection.PostgresConnection]
connect | 2023-01-29 09:57:40,721 INFO   Postgres|outbox.mytable.events|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
connect | 2023-01-29 09:57:40,722 INFO   Postgres|outbox.mytable.events|streaming  Seeking to LSN{0/1543AB8} on the replication slot with command SELECT pg_replication_slot_advance('debezium', '0/1543AB8')   [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
postgres | 2023-01-29 09:57:40.722 UTC [2859] LOG:  starting logical decoding for slot "debezium"
postgres | 2023-01-29 09:57:40.722 UTC [2859] DETAIL:  Streaming transactions committing after 0/1543970, reading WAL from 0/1543938.
postgres | 2023-01-29 09:57:40.722 UTC [2859] STATEMENT:  SELECT pg_replication_slot_advance('debezium', '0/1543AB8')
postgres | 2023-01-29 09:57:40.722 UTC [2859] LOG:  logical decoding found consistent point at 0/1543938
postgres | 2023-01-29 09:57:40.722 UTC [2859] DETAIL:  There are no running transactions.
postgres | 2023-01-29 09:57:40.722 UTC [2859] STATEMENT:  SELECT pg_replication_slot_advance('debezium', '0/1543AB8')
postgres | 2023-01-29 09:57:40.733 UTC [2859] LOG:  starting logical decoding for slot "debezium"
postgres | 2023-01-29 09:57:40.733 UTC [2859] DETAIL:  Streaming transactions committing after 0/1543AB8, reading WAL from 0/1543998.
postgres | 2023-01-29 09:57:40.733 UTC [2859] STATEMENT:  START_REPLICATION SLOT "debezium" LOGICAL 0/1543AB8 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
postgres | 2023-01-29 09:57:40.733 UTC [2859] LOG:  logical decoding found consistent point at 0/1543998
postgres | 2023-01-29 09:57:40.733 UTC [2859] DETAIL:  There are no running transactions.
postgres | 2023-01-29 09:57:40.733 UTC [2859] STATEMENT:  START_REPLICATION SLOT "debezium" LOGICAL 0/1543AB8 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
connect | 2023-01-29 09:57:40,743 INFO   Postgres|outbox.mytable.events|streaming  Requested thread factory for connector PostgresConnector, id = outbox.mytable.events named = keep-alive   [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,744 INFO   Postgres|outbox.mytable.events|streaming  Creating thread debezium-postgresconnector-outbox.mytable.events-keep-alive   [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,758 INFO   Postgres|outbox.mytable.events|streaming  Searching for WAL resume position   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]

Once the snapshot is done, every insert/modification on the Table will now send an Event to the Kafka Topic .

If we create a consumer and take a look at this topic, we can see the message that we have previously inserted:


Considerations:

The default plugin.name used for the Debezium connector is “decodeBufs” so if it is not enabled on your postgreSQL, it will give the below exception.

postgres | 2023-01-27 14:41:17.396 UTC [69] ERROR:  could not access file "decoderbufs": No such file or directory
postgres | 2023-01-27 14:41:17.396 UTC [69] STATEMENT:  CREATE_REPLICATION_SLOT "debezium"  LOGICAL decoderbufs
connect | 2023-01-27 14:41:17,399 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
connect | io.debezium.DebeziumException: Creation of replication slot failed
connect |       at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:143)
connect |       at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:136)
connect |       at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:270)
connect |       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
connect |       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
connect |       at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
connect |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect |       at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
connect |       at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
connect |       at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
connect |       at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
connect |       at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
connect |       at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
connect |       at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:333)
connect |       at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:319)
connect |       at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:295)
connect |       at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:290)
connect |       at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:425)
connect |       at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:136)

One thought on “Publish PostgreSQL Data Changes as Kafka Events using Debezium Connector

Leave a comment