Debezium Connector is used to capture row-level changes and publish them as Events to your Kafka Cluster without the need to write custom code.
If you have a requirement for (change data capture) in an Event-Driven Architecture, then consider Debezium.
Debezium support JSON or you can use a custom converter, for Avro Serialization.
The Debezium PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. For information about the PostgreSQL versions that are compatible with the connector, see the Debezium release overview.
https://debezium.io/documentation/reference/stable/connectors/postgresql.html
The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.
The Architecture of Debezium is described here.
We will use Docker to setup all the required components locally for this but in a real environment, you could do deploy on a Kubernetes/OpenShift or if you want to run it Embedded within your application but it will not have the level of fault tolerance/reliability as running combined with a Kafka Cluster.
Setup:
Docker Containers:
Create a docker-compose.yml file with the below contents:
version: "3"
services:
zookeeper:
container_name: zookeeper
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
container_name: kafka
image: docker.io/bitnami/kafka:3.3
ports:
- "9092:9092"
- "9094:9094"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
depends_on:
- zookeeper
connect:
container_name: connect
image: quay.io/debezium/connect:2.1
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on:
- kafka
postgres:
container_name: postgres
image: postgres
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
command:
- "postgres"
- "-c"
- "wal_level=logical"
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
run docker-compose up -d
Docker will create a default network. The network name will be the directory you ran docker-compose from. The common network will allow all 3 containers to communicate with each other.
If you run each container separately, then you will need to create a network and connect all containers (docker –link them).
docker-compose up -d
Creating network "kafka_demo_default" with the default driver
Creating volume "kafka_demo_zookeeper_data" with local driver
Creating volume "kafka_demo_kafka_data" with local driver
Creating postgres ... done
Creating zookeeper ... done
Creating kafka ... done
Creating connect ... done
Verify that everything is up and green on Docker Windows.
The containers installed are (Kafka, ZooKeeper, Debezium-connector, and a PostgreSQL)
Database:
Create a Schema and a Table in the PostgreSQL Database running the docker container:
Schema: outbox
Table: mytable
Columns:
id int (Primary Key)
message varchar
Insert some sample records into this table:
INSERT INTO outbox.mytable
(id, message)
VALUES(8, 'hello world9');
Debezium Connector REST:
Use the Debezium REST API to register a connector and specify that we need Debezium to monitor mytable and send Events to a Topic.
Lets do a GET first to see if we have any registered connectors:GET localhost:8083/connectors
No connectors registered, so we should register a connector:POST localhost:8083/connectors
The JSON Payload is below. You should use POST and if successful you will receive status 201 Created
{
"name": "mytable-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.dbname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"schema.include.list": "outbox",
"table.include.list": "mytable",
"signal.data.collection": "outbox.mytable",
"topic.prefix": "mytopic.prefix",
"database.include.list": "postgres",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092"
}
}
- plugin.name: pgoutput –>
pgoutput
is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for logical replication.
This plug-in is always present so no additional libraries need to be installed.
The Debezium connector interprets the raw replication event stream directly into change events. - schema.include.list –> The schema you want to monitor
- table.include.list –> The table inside your schema that you want to monitor
- signal.data.collection –> Fully-qualified name of the data collection that is used to send signals to the connector. Use the following format to specify the collection name:
<schemaName>.<tableName
> - topic.prefix –> The prefix to the topic that will be created by the Connector.
Debezium Connector Log:
After registering the mytable-connector, Debezium will take a snapshot of the data that already exists in the table.
connect | 2023-01-29 09:57:40,637 INFO Postgres|outbox.mytable.events|postgres-connector-task Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/1543970}, catalogXmin=737] [io.debezium.connector.postgresql.connection.PostgresConnection]
connect | 2023-01-29 09:57:40,638 INFO Postgres|outbox.mytable.events|postgres-connector-task No previous offset found [io.debezium.connector.postgresql.PostgresConnectorTask]
connect | 2023-01-29 09:57:40,638 INFO Postgres|outbox.mytable.events|postgres-connector-task Requested thread factory for connector PostgresConnector, id = outbox.mytable.events named = change-event-source-coordinator [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,639 INFO Postgres|outbox.mytable.events|postgres-connector-task Creating thread debezium-postgresconnector-outbox.mytable.events-change-event-source-coordinator [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,639 INFO || WorkerSourceTask{id=mytable-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
connect | 2023-01-29 09:57:40,640 INFO Postgres|outbox.mytable.events|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,641 INFO Postgres|outbox.mytable.events|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,641 INFO Postgres|outbox.mytable.events|snapshot Taking initial snapshot for new datasource [io.debezium.connector.postgresql.snapshot.InitialSnapshotter]
connect | 2023-01-29 09:57:40,641 INFO Postgres|outbox.mytable.events|snapshot According to the connector configuration data will be snapshotted [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,641 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,668 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,670 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 3 - Locking captured tables [] [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,670 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,670 INFO Postgres|outbox.mytable.events|snapshot Creating initial offset context [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,671 INFO Postgres|outbox.mytable.events|snapshot Read xlogStart at 'LSN{0/1543AB8}' from transaction '738' [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,672 INFO Postgres|outbox.mytable.events|snapshot Read xlogStart at 'LSN{0/1543AB8}' from transaction '738' [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,672 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,686 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,686 INFO Postgres|outbox.mytable.events|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO Postgres|outbox.mytable.events|snapshot Snapshotting contents of 0 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO Postgres|outbox.mytable.events|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO Postgres|outbox.mytable.events|snapshot Snapshot completed [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,687 INFO Postgres|outbox.mytable.events|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='outbox.mytable.events'db='postgres', lsn=LSN{0/1543AB8}, txId=738, timestamp=2023-01-29T09:57:40.672Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,687 WARN Postgres|outbox.mytable.events|snapshot After applying the include/exclude list filters, no changes will be captured. Please check your configuration! [io.debezium.relational.RelationalDatabaseSchema]
connect | 2023-01-29 09:57:40,687 INFO Postgres|outbox.mytable.events|streaming Connected metrics set to 'true' [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,702 INFO Postgres|outbox.mytable.events|streaming No incremental snapshot in progress, no action needed on start [io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource]
connect | 2023-01-29 09:57:40,702 INFO Postgres|outbox.mytable.events|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
connect | 2023-01-29 09:57:40,703 INFO Postgres|outbox.mytable.events|streaming Retrieved latest position from stored offset 'LSN{0/1543AB8}' [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
connect | 2023-01-29 09:57:40,703 INFO Postgres|outbox.mytable.events|streaming Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/1543AB8}' [io.debezium.connector.postgresql.connection.WalPositionLocator]
connect | 2023-01-29 09:57:40,703 INFO Postgres|outbox.mytable.events|streaming Initializing PgOutput logical decoder publication [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
connect | 2023-01-29 09:57:40,719 INFO Postgres|outbox.mytable.events|streaming Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/1543970}, catalogXmin=737] [io.debezium.connector.postgresql.connection.PostgresConnection]
connect | 2023-01-29 09:57:40,721 INFO Postgres|outbox.mytable.events|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
connect | 2023-01-29 09:57:40,722 INFO Postgres|outbox.mytable.events|streaming Seeking to LSN{0/1543AB8} on the replication slot with command SELECT pg_replication_slot_advance('debezium', '0/1543AB8') [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
postgres | 2023-01-29 09:57:40.722 UTC [2859] LOG: starting logical decoding for slot "debezium"
postgres | 2023-01-29 09:57:40.722 UTC [2859] DETAIL: Streaming transactions committing after 0/1543970, reading WAL from 0/1543938.
postgres | 2023-01-29 09:57:40.722 UTC [2859] STATEMENT: SELECT pg_replication_slot_advance('debezium', '0/1543AB8')
postgres | 2023-01-29 09:57:40.722 UTC [2859] LOG: logical decoding found consistent point at 0/1543938
postgres | 2023-01-29 09:57:40.722 UTC [2859] DETAIL: There are no running transactions.
postgres | 2023-01-29 09:57:40.722 UTC [2859] STATEMENT: SELECT pg_replication_slot_advance('debezium', '0/1543AB8')
postgres | 2023-01-29 09:57:40.733 UTC [2859] LOG: starting logical decoding for slot "debezium"
postgres | 2023-01-29 09:57:40.733 UTC [2859] DETAIL: Streaming transactions committing after 0/1543AB8, reading WAL from 0/1543998.
postgres | 2023-01-29 09:57:40.733 UTC [2859] STATEMENT: START_REPLICATION SLOT "debezium" LOGICAL 0/1543AB8 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
postgres | 2023-01-29 09:57:40.733 UTC [2859] LOG: logical decoding found consistent point at 0/1543998
postgres | 2023-01-29 09:57:40.733 UTC [2859] DETAIL: There are no running transactions.
postgres | 2023-01-29 09:57:40.733 UTC [2859] STATEMENT: START_REPLICATION SLOT "debezium" LOGICAL 0/1543AB8 ("proto_version" '1', "publication_names" 'dbz_publication', "messages" 'true')
connect | 2023-01-29 09:57:40,743 INFO Postgres|outbox.mytable.events|streaming Requested thread factory for connector PostgresConnector, id = outbox.mytable.events named = keep-alive [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,744 INFO Postgres|outbox.mytable.events|streaming Creating thread debezium-postgresconnector-outbox.mytable.events-keep-alive [io.debezium.util.Threads]
connect | 2023-01-29 09:57:40,758 INFO Postgres|outbox.mytable.events|streaming Searching for WAL resume position [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
Once the snapshot is done, every insert/modification on the Table will now send an Event to the Kafka Topic .
If we create a consumer and take a look at this topic, we can see the message that we have previously inserted:
Considerations:
The default plugin.name used for the Debezium connector is “decodeBufs” so if it is not enabled on your postgreSQL, it will give the below exception.
postgres | 2023-01-27 14:41:17.396 UTC [69] ERROR: could not access file "decoderbufs": No such file or directory
postgres | 2023-01-27 14:41:17.396 UTC [69] STATEMENT: CREATE_REPLICATION_SLOT "debezium" LOGICAL decoderbufs
connect | 2023-01-27 14:41:17,399 ERROR || WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
connect | io.debezium.DebeziumException: Creation of replication slot failed
connect | at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:143)
connect | at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:136)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:270)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
connect | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect | at java.base/java.lang.Thread.run(Thread.java:829)
connect | Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
connect | at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
connect | at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
connect | at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
connect | at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
connect | at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
connect | at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:333)
connect | at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:319)
connect | at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:295)
connect | at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:290)
connect | at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:425)
connect | at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:136)
One thought on “Publish PostgreSQL Data Changes as Kafka Events using Debezium Connector”