Skip to content

Aiven Logo
  • Pricing
  • Blog

Log in

Book a demo

Start for free

Developer Center
  1. Aiven Developer Center
  2. Databases

Change Data Capture from Amazon RDS to Apache Kafka® with Debezium

Implement a real-time change data capture workflow from an Amazon Relational Database Service database using Aiven for Apache Kafka®

  • Data
  • Apache Kafka®
  • Integration
  • AWS
Subscribe to RSS
Loading...

Subscribe to RSS

Amazon RDS offers a PostgreSQL backend for applications. This tutorial show you how to build a real-time change data capture process to track the changes happening in one (or more) RDS tables and stream them into Apache Kafka®, where multiple consumers can receive the data with a minimal latency.

We'll use Aiven for Apache Kafka® as the destination for our streaming data. You'll also need an AWS account to follow along.

Create AWS RDS database

Head to the AWS Console and:

  • Navigate to the Products tab
  • Select Databases
  • Click on RDS
  • Click on Create Database

Create database button

  • Select PostgreSQL
  • Give the database a name like database-test
  • In the Availability and durability section, select Single DB instance (for the purpose of this tutorial, the Multi-AZ DB instance and Multi-AZ DB Cluster are functionally equivalent)
  • In the Settings section, change the master password to test12345

Note: For production use cases, use a secure password.

  • In the Connectivity section, select a VPC with an internet gateway attached and enable Public Access. You also might want to check that the inbound traffic is allowed from the IP you'll try to connect to RDS from.

Note: you can avoid exposing RDS to public access and connect it to Apache Kafka via VPC peering or have the Aiven for Apache Kafka® service as part of your AWS account with the Bring Your Own Cloud model.

Populate AWS RDS database

Once the database is created, we can review the details in the AWS console. In the Connectivity and Security section we can check the database hostname and port.

AWS Console showing the database details

With the above information, we can connect to RDS with psql:

Loading code...

Where:

  • <USERNAME> is the connection username, postgres if you left the default unchanged
  • <PASSWORD> is the connection password, test12345 if you followed the above instructions
  • <HOSTNAME> is the database host, taken from the AWS Console
  • <PORT> is the database port, taken from the AWS Console
  • <DATABASE_NAME> is the database name, by default postgres

If you're using the defaults and have followed the above instructions, the psql call should be:

Loading code...

Once connected, we can create a test table and insert some data:

Loading code...

A query like SELECT * FROM FORNITURE; should provide the following results

Loading code...

Create an Aiven for Apache Kafka® service with Kafka Connect enabled

If you don't have an Apache Kafka cluster available, you can create one with Aiven by:

  • Navigate to the Aiven Console
  • Click on Create service
  • Select Apache Kafka®
  • Select the cloud and region where the service will be deployed. Selecting the same cloud region where your RDS database is located will minimize latency.
  • Access the Aiven for Apache Kafka service page and enable:
    • Kafka Connect to perform the change data capture
    • REST API to browse the data from the Aiven Console
    • kafka.auto_create_topics_enable from the Advanced configuration section to automatically create topics based on the Kafka Connect configurations

Apache Kafka REST APIs and Kafka Connect enabled

Create a Change Data Capture process with the Debezium Connector

Once Aiven for Apache Kafka is running, the next step is to setup the CDC pipeline. To do so you can head to the Connectors tab, select the Debezium for PostgreSQL connector and include the following configuration:

Loading code...

Where:

  • database.hostname, database.port, database.password are the RDS connection parameters found in the AWS Console
  • database.server.name is the prefix for the topic names in Aiven for Apache Kafka
  • plugin.name is the PostgreSQL plugin name, pgoutput
  • slot.name and publication.name are the name of the replication slot and publication in PostgreSQL
  • "publication.autocreate.mode": "filtered" creates a publication only for the tables in scope
  • table.include.list lists the tables for which we want to enable CDC

After replacing the placeholders in the JSON configuration file with the connection parameters defined above you can start the connector by:

  • Navigating to the Aiven Console
  • Navigating to the Aiven for Apache Kafka service page
  • Clicking in the Connectors tab
  • Clicking on New Connector
  • Selecting the Debezium - PostgreSQL
  • Editing the JSON connector configuration and pasting the JSON configuration defined above.
  • Clicking on Create Connector

If you hit the error below

Loading code...

You'll need to enable logical replication. To check the logical replication run show wal_level; from a terminal connected to the PostgreSQL database, it should show the wal_level as logical.

Check the changes in Apache Kafka

Once the connector is up and running, you should see a topic named mydebprefix.public.forniture, the concatenation of the database.server.name parameter and the RDS schema and table name. To check the data in the topic in the Aiven Console:

  • Navigate to the Topics tab
  • Click on the mydebprefix.public.forniture topic
  • Click on Messages
  • Click on Fetch Messages
  • Enable the Decode from base64

You should see the same dataset you previously pushed to RDS appearing in JSON format in the topic.

CDC data appearing in Apache Kafka

If we perform an insert, delete and update using psql in the terminal connected to RDS, for example:

Loading code...

You might get the following error:

Loading code...

To solve the problem you can enable full replica identity in the table with:

Loading code...

If the above changes work, we should be able to see them in the Aiven for Kafka UI, by re-clicking on the Fetch Messages button.

The last three changes being visualized in Aiven for Apache Kafka topic

Conclusion

Setting up a change data capture process from an RDS PostgreSQL database to Apache Kafka with the Debezium connector is a powerful method to be able to stream the inserts/updates/deletes to one or more consumers in real time.

Some more resources if you are interested:

  • Debezium PostgreSQL connector parameters
  • PostgreSQL replica identity
  • Limits of the JDBC source connector

Table of contents

  • Create AWS RDS database
  • Populate AWS RDS database
  • Create an Aiven for Apache Kafka® service with Kafka Connect enabled
  • Create a Change Data Capture process with the Debezium Connector
  • Check the changes in Apache Kafka
  • Conclusion
Aiven Logo at footer
Loading...
  • Github
  • Facebook
  • LinkedIn
  • Twitter
  • Youtube

Company

  • About
  • Open source
  • Careers
  • Sustainability
  • Modern slavery statement
  • Press
  • Blog

Legal

  • Terms
  • SLA
  • AUP
  • Data processing
  • Privacy
  • DSA contact
  • Cookie policy
  • Website terms of use
  • Do not sell or share my personal information

Platform

  • Responsibility matrix
  • Subprocessors
  • Security and compliance
  • Resource library
  • Support services
  • Changelog
  • Aiven status

Contact

  • Contact us
  • Book a demo
  • Support
  • Invoice address
  • Events calendar

Copyright © Aiven 2016-2025. Apache, Apache Kafka, Kafka, Apache Flink, and Flink are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. ClickHouse is a registered trademark of ClickHouse, Inc. https://clickhouse.com. OpenSearch, PostgreSQL, MySQL, Grafana, Dragonfly, Valkey, Thanos, Terraform, and Kubernetes are trademarks and property of their respective owners. All product and service names used in this website are for identification purposes only and do not imply endorsement.

psql postgres://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DATABASE_NAME>
psql postgres://postgres:test12345@<HOSTNAME>:<PORT>/postgres
CREATE TABLE FORNITURE (ID SERIAL, NAME TEXT); INSERT INTO FORNITURE (NAME) VALUES ('CHAIR'),('TABLE'),('SOFA'),('FRIDGE');
id | name ----+-------- 1 | CHAIR 2 | TABLE 3 | SOFA 4 | FRIDGE (4 rows)
{ "name": "mysourcedebezium", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "<HOSTNAME>", "database.port": "<PORT>", "database.user": "postgres", "database.password": "<PASSWORD>", "database.dbname": "postgres", "database.server.name": "mydebprefix", "plugin.name": "pgoutput", "slot.name": "mydeb_slot", "publication.name": "mydeb_pub", "publication.autocreate.mode": "filtered", "table.include.list": "public.forniture" }
There was an error in the configuration. database.hostname: Postgres server wal_level property must be "logical" but is: replica
INSERT INTO FORNITURE (NAME) VALUE ('REFRIGERATOR'); DELETE FROM FORNITURE WHERE NAME='FRIDGE'; UPDATE FORNITURE SET NAME='COUCH' WHERE NAME='SOFA';
ERROR: cannot delete from table "forniture" because it does not have a replica identity and publishes deletes HINT: To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.
ALTER TABLE FORNUTURE SET REPLICA IDENTITY FULL;