Change Data Capture from Amazon RDS to Apache Kafka® with Debezium
Implement a real-time change data capture workflow from an Amazon Relational Database Service database using Aiven for Apache Kafka®
Implement a real-time change data capture workflow from an Amazon Relational Database Service database using Aiven for Apache Kafka®
Amazon RDS offers a PostgreSQL backend for applications. This tutorial show you how to build a real-time change data capture process to track the changes happening in one (or more) RDS tables and stream them into Apache Kafka®, where multiple consumers can receive the data with a minimal latency.
We'll use Aiven for Apache Kafka® as the destination for our streaming data. You'll also need an AWS account to follow along.
Head to the AWS Console and:
database-testtest12345Note: For production use cases, use a secure password.
Note: you can avoid exposing RDS to public access and connect it to Apache Kafka via VPC peering or have the Aiven for Apache Kafka® service as part of your AWS account with the Bring Your Own Cloud model.
Once the database is created, we can review the details in the AWS console. In the Connectivity and Security section we can check the database hostname and port.
With the above information, we can connect to RDS with psql:
Loading code...
Where:
<USERNAME> is the connection username, postgres if you left the default unchanged<PASSWORD> is the connection password, test12345 if you followed the above instructions<HOSTNAME> is the database host, taken from the AWS Console<PORT> is the database port, taken from the AWS Console<DATABASE_NAME> is the database name, by default postgresIf you're using the defaults and have followed the above instructions, the psql call should be:
Loading code...
Once connected, we can create a test table and insert some data:
Loading code...
A query like SELECT * FROM FORNITURE; should provide the following results
Loading code...
If you don't have an Apache Kafka cluster available, you can create one with Aiven by:
kafka.auto_create_topics_enable from the Advanced configuration section to automatically create topics based on the Kafka Connect configurations
Once Aiven for Apache Kafka is running, the next step is to setup the CDC pipeline. To do so you can head to the Connectors tab, select the Debezium for PostgreSQL connector and include the following configuration:
Loading code...
Where:
database.hostname, database.port, database.password are the RDS connection parameters found in the AWS Consoledatabase.server.name is the prefix for the topic names in Aiven for Apache Kafkaplugin.name is the PostgreSQL plugin name, pgoutputslot.name and publication.name are the name of the replication slot and publication in PostgreSQL"publication.autocreate.mode": "filtered" creates a publication only for the tables in scopetable.include.list lists the tables for which we want to enable CDCAfter replacing the placeholders in the JSON configuration file with the connection parameters defined above you can start the connector by:
If you hit the error below
Loading code...
You'll need to enable logical replication. To check the logical replication run show wal_level; from a terminal connected to the PostgreSQL database, it should show the wal_level as logical.
Once the connector is up and running, you should see a topic named mydebprefix.public.forniture, the concatenation of the database.server.name parameter and the RDS schema and table name. To check the data in the topic in the Aiven Console:
mydebprefix.public.forniture topicYou should see the same dataset you previously pushed to RDS appearing in JSON format in the topic.
If we perform an insert, delete and update using psql in the terminal connected to RDS, for example:
Loading code...
You might get the following error:
Loading code...
To solve the problem you can enable full replica identity in the table with:
Loading code...
If the above changes work, we should be able to see them in the Aiven for Kafka UI, by re-clicking on the Fetch Messages button.
Setting up a change data capture process from an RDS PostgreSQL database to Apache Kafka with the Debezium connector is a powerful method to be able to stream the inserts/updates/deletes to one or more consumers in real time.
Some more resources if you are interested:
psql postgres://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DATABASE_NAME>psql postgres://postgres:test12345@<HOSTNAME>:<PORT>/postgresCREATE TABLE FORNITURE (ID SERIAL, NAME TEXT);
INSERT INTO FORNITURE (NAME) VALUES ('CHAIR'),('TABLE'),('SOFA'),('FRIDGE'); id | name
----+--------
1 | CHAIR
2 | TABLE
3 | SOFA
4 | FRIDGE
(4 rows){
"name": "mysourcedebezium",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<HOSTNAME>",
"database.port": "<PORT>",
"database.user": "postgres",
"database.password": "<PASSWORD>",
"database.dbname": "postgres",
"database.server.name": "mydebprefix",
"plugin.name": "pgoutput",
"slot.name": "mydeb_slot",
"publication.name": "mydeb_pub",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.forniture"
}There was an error in the configuration.
database.hostname: Postgres server wal_level property must be "logical" but is: replicaINSERT INTO FORNITURE (NAME) VALUE ('REFRIGERATOR');
DELETE FROM FORNITURE WHERE NAME='FRIDGE';
UPDATE FORNITURE SET NAME='COUCH' WHERE NAME='SOFA';ERROR: cannot delete from table "forniture" because it does not have a replica identity and publishes deletes
HINT: To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.ALTER TABLE FORNUTURE SET REPLICA IDENTITY FULL;