Debezium source connector - PostgreSQL® to Apache Kafka®
Use Debezium as a source connector to integrate PostgreSQL® and Apache Kafka® and use Terraform to deploy. A part of Aiven's Terraform Cookbook.
The Aiven Terraform Provider is a great choice for provisioning an Aiven for Apache Kafka® cluster with Kafka Connect enabled and the Debezium source connector for PostgreSQL® configured.
Let's check out the following diagram to understand the setup.
Describe the setup
This terraform recipe will provision one Aiven for PostgreSQL® database service, one Aiven for Apache Kafka service, and a separate Aiven for Apache Kafka Connect service with a Debezium source connector for PostgreSQL enabled and configured to connect to the PostgreSQL database and capture any changes in tables. The Aiven for Apache Kafka service is deployed in the Azure cloud, whereas the PostgreSQL database, like the Aiven for Apache Kafka Connector service, is deployed in the Google Cloud.
Aiven makes it very easy to configure services in different clouds that integrate seamlessly. As soon as any of the monitored tables is inserted or updated with new data, the Debezium connector will capture the data change and convert table data into a JSON payload and produce messages to the relevant Kafka topic. Some of these services are created on one cloud provider and some on another cloud provider, to demonstrate how easy it is with Aiven to integrate services across multiple cloud vendors.
Warning
Aiven provides the option to run Kafka Connect on the same nodes as your Kafka cluster, sharing the resources. This is a low-cost way to get started with Kafka Connect. A standalone Aiven for Apache Kafka® Connect allows you to scale independently, offers more CPU time and memory for the Kafka Connect service and reduces load on nodes, making the cluster more stable.Be sure to check out the getting started guide to learn about the common files required to execute the following recipe.
For example, you'll need to declare the variables for project
and api_token
.
Common files
Navigate to a new folder and add the following files.
Add the following to a new provider.tf
file:
terraform { required_providers { aiven = { source = "aiven/aiven" version = ">=4.0.0, < 5.0.0" } } } provider "aiven" { api_token = var.aiven_api_token }
You can also set the environment variable TF_VAR_aiven_api_token
for the api_token
property. With this, you don't need to pass the -var-file
flag when executing Terraform commands.
To avoid including sensitive information in source control, the variables are defined here in the variables.tf
file. You can then use a *.tfvars
file with the actual values so that Terraform receives the values during runtime, and exclude it.
The variables.tf
file defines the API token, the project name to use, and the prefix for the service name:
variable "aiven_api_token" { description = "Aiven console API token" type = string } variable "project_name" { description = "Aiven console project name" type = string }
The var-values.tfvars
file holds the actual values and is passed to Terraform using the -var-file=
flag.
var-values.tfvars
file:
aiven_api_token = "<YOUR-AIVEN-AUTHENTICATION-TOKEN-GOES-HERE>" project_name = "<YOUR-AIVEN-CONSOLE-PROJECT-NAME-GOES-HERE>"
Services.tf file
The services.tf
file for the provisioning of these three services, service integration, and related resource is this:
resource "aiven_pg" "demo-pg" { project = var.project_name service_name = "demo-postgres" cloud_name = "google-europe-north1" plan = "business-4" } resource "aiven_kafka" "demo-kafka" { project = var.project_name cloud_name = "google-europe-north1" plan = "business-4" service_name = "demo-kafka" maintenance_window_dow = "saturday" maintenance_window_time = "10:00:00" kafka_user_config { kafka_rest = true kafka_connect = false schema_registry = true kafka_version = "3.4" kafka { auto_create_topics_enable = true num_partitions = 3 default_replication_factor = 2 min_insync_replicas = 2 } kafka_authentication_methods { certificate = true } } } resource "aiven_kafka_connect" "demo-kafka-connect" { project = var.project_name cloud_name = "google-europe-north1" plan = "business-4" service_name = "demo-kafka-connect" maintenance_window_dow = "monday" maintenance_window_time = "10:00:00" kafka_connect_user_config { kafka_connect { consumer_isolation_level = "read_committed" } public_access { kafka_connect = false } } } resource "aiven_service_integration" "i1" { project = var.project_name integration_type = "kafka_connect" source_service_name = aiven_kafka.demo-kafka.service_name destination_service_name = aiven_kafka_connect.demo-kafka-connect.service_name kafka_connect_user_config { kafka_connect { group_id = "connect" status_storage_topic = "__connect_status" offset_storage_topic = "__connect_offsets" } } } resource "aiven_kafka_connector" "kafka-pg-source" { project = var.project_name service_name = aiven_kafka_connect.demo-kafka-connect.service_name connector_name = "kafka-pg-source" config = { "name" = "kafka-pg-source" "connector.class" = "io.debezium.connector.postgresql.PostgresConnector" "snapshot.mode" = "initial" "database.hostname" = sensitive(aiven_pg.demo-pg.service_host) "database.port" = sensitive(aiven_pg.demo-pg.service_port) "database.password" = sensitive(aiven_pg.demo-pg.service_password) "database.user" = sensitive(aiven_pg.demo-pg.service_username) "database.dbname" = "defaultdb" "database.server.name" = "replicator" "database.ssl.mode" = "require" "include.schema.changes" = true "include.query" = true "table.include.list" = "public.tab1" "plugin.name" = "pgoutput" "publication.autocreate.mode" = "filtered" "decimal.handling.mode" = "double" "_aiven.restart.on.failure" = "true" "heartbeat.interval.ms" = 30000 "heartbeat.action.query" = "INSERT INTO heartbeat (status) VALUES (1)" } depends_on = [aiven_service_integration.i1] }
Execute the files
The init
command performs several different initialization steps in order to prepare the current working directory for use with Terraform. In our case, this command automatically finds, downloads, and installs the necessary Aiven Terraform provider plugins.
terraform init
The plan
command creates an execution plan and shows you the resources that will be created (or modified) for you. This command does not actually create any resource; this is more like a preview.
terraform plan -var-file=var-values.tfvars
If you're satisfied with the output of terraform plan
, go ahead and run the terraform apply
command which actually does the task or creating (or modifying) your infrastructure resources.
terraform apply -var-file=var-values.tfvars
Let's go over a few of these configurations and understand their functions:
- The
auto_create_topics_enable = true
property allows the Debezium connector to send messages to a non-existing topic. - The
kafka_connect = false
property is used because we want to create a separate Aiven for Apache Kafka Connect service. - The resource
aiven_service_integration.i1
configures the integration between the Aiven for Apache Kafka service and the Aiven for Apache Kafka Connect service. This integration uses two internal topics for storing status and offset. group_id
underkafka_connect_user_config
is a unique ID that identifies the Kafka Connect cluster.status_storage_topic
andoffset_storage_topic
identify the name of the internal Kafka topics that store the connector status and the connector offsets respectively.snapshot.mode = "initial"
means the connector will, upon first connection, snapshot the entire structure and data of the linked database. After this snapshot, it will then capture and stream row-level changes. This initial capture ensures no data is lost in transition, but it might put some load on the database when first activated.plugin.name = "pgoutput"
specifies that the replication output plugin used by Debezium for PostgreSQL is “pgoutput.” It’s the standard logical decoding output plugin in recent PostgreSQL versions and is supported natively without needing additional extensions.publication.autocreate.mode = "filtered"
means that the connector will either use an existing logical replication publication with the specified name or create one if it doesn’t exist. The “filtered” mode ensures that only the changes to the tables specified in the configuration (like "table.include.list") are included in the publication.- The Debezium source connector for PostgreSQL listens for all data changes on one or more tables, including schema changes. In our case, the table that is monitored for any data change is "tab1" in
defaultdb
database underpublic
schema. The plugin used to capture changes iswal2json
that converts WAL events (WAL stands for Write Ahead Logging) into JSON payload that is sent to the Kafka topic via the Kafka connect service. The Kafka topic that the Debezium connector creates has the namereplicator.public.tab1
, where "replicator" is the logical database used by Debezium connector to monitor for data changes and "public" and "tab1" are the name of the PostgreSQL schema and table name respectively. - The
depends_on
property establishes a dependency between the services creation in order to avoid failures.
More resources
Keep in mind that some parameters and configurations will vary for your case. A reference to some of the advanced Apache Kafka configurations and other related resources: