Enabling change data capture from MySQL to Apache Kafka® with Debezium
Implement a change data capture workflow in Apache Kafka®, a key component of any organization with high data integrity requirements.
Implement a change data capture workflow in Apache Kafka®, a key component of any organization with high data integrity requirements.
Change Data Capture (CDC) is the process of tracking the changes happening in one or more database table in a source system and propagating them to a target technology. The objective of this tutorial is to create a CDC flow from a source table in a MySQL database to Apache Kafka® via the Kafka Connect Debezium Source connector.
Debezium 2.5
This article describes the configuration for Debezium version 2.5 and later.In order to setup a change data capture process tracking the changes from a MySQL database to Apache Kafka, we can use Kafka Connect, a framework which integrates Apache Kafka with external systems.
There are two types of Kafka Connect connectors we can use to set up a CDC process between MySQL and Apache Kafka:
While both connectors can move MySQL data into Apache Kafka, the Debezium connector tracks the changes in streaming mode (not in a query based batch mode). This allows us to enhance the extracted data with useful metadata like transaction, IDs and timestamps, and pre-post update details.
The following tutorial guides you through setting up a CDC process using the Debezium connector.
The first step into setting up a CDC process is to create the source MySQL database and the target Apache Kafka cluster with Kafka Connect enabled. We'll use Aiven for MySQL and Aiven for Apache Kafka® to do this.
Loading code...
Create a Aiven for MySQL service named demo-mysql-source:
Loading code...
Create an Aiven for Apache Kafka service named demo-kafka with Kafka Connect and Schema Registry enabled:
Loading code...
Wait for the services to start with:
Loading code...
Once the services are up and running, we can create the table in MySQL in which we'll track the changes. To create and populate the table:
Get the Aiven for MySQL connection details:
Loading code...
Use the mysql command to connect to the database. Replace the placeholders for <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> with the values taken from the command above. Alternatively, you can find the pre-filled mysql command from the Quick Connect button in the Aiven Console service overview.
Loading code...
Create a table called users with a numeric id column, a username string and populate with three rows for Francesco, Ana, and Floor
Loading code...
Verify that the above commands are successful by checking the users table with:
Loading code...
The result should be similar to the below:
Loading code...
To create the CDC pipeline, we need to define the settings of the Kafka Connect Debezium Source connector. These settings contain:
As done in the section above, we can use the avn service get command to retrieve the Aiven for MySQL parameters:
Loading code...
We can do the same to retrieve the Apache Kafka Schema Registry parameters needed for the schema integration. You can fetch the connection details with:
Loading code...
Note
Note: the above command uses jq to parse theavn command output and retrieve the connection URI which will be in the format https://<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>@<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>. You'll need to parse the information to fill the Kafka Connect connector configuration file in the following section.To retrieve the Apache Kafka endpoint, which we'll use to track the DDL changes in a topic:
Loading code...
In a file named mysql_source_deb_connector.json add the following JSON configuration to define the Debezium connector configuration:
Loading code...
We need to replace the following values:
<CONNECTOR_NAME> is the name we're giving this connector. For this tutorial, set it to mysql_source_deb_connector.<MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> and <MYSQL_SSL_MODE><UNIQUE_ID> a number, which must be different for each connector running. For this tutorial, set it to 12345.<KAFKA_TOPIC_PREFIX> will be used as a prefix for all the Kafka topics that this connector sends events to. For this tutorial, set it to example.<MYSQ_TABLES> is the list of MySQL tables to capture changes from. This parameter is optional. The default is the empty string, which means capture all tables. For this tutorial, set it to defaultdb.users, the one table we have. (Note that there's also a parameter called table.exclude.list for the opposite effect.)<NR_TASKS> should be set to 1, since the MySQL connector always uses a single task. This is actually the default value, so the tasks.max parameter can also be omitted.<HISTORY_TOPIC_NAME> is the name of the Apache Kafka topic that contains the history of schema changes. For this tutorial, set it to ddl_history.<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT> and <SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD><APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>See the Debezium MySQL connector documentation for more infomation.
Once the parameters are substituted, we can start the Debezium source connector with:
Loading code...
Since we set the name parameter to mysql_source_deb_connector, the above starts a connector with that name.
We can check its status with:
Loading code...
The status should be RUNNING.
Loading code...
If the connector is up and running, we can check that the data is flowing from MySQL to Apache Kafka with kcat. Get the kcat parameters and download the certificates needed to connect with:
Loading code...
The above command will store the three certificates (ca.pem, service.crt and service.key) in the current local folder and return the command to execute.
To verify the data in Apache Kafka we can use the following command, substituting the placeholders for <KAFKA_HOST>, <KAFKA_PORT>, <KAFKA_SCHEMA_REGISTRY_USR>, <KAFKA_SCHEMA_REGISTRY_PWD>, <KAFKA_HOST>, and <KAFKA_SCHEMA_REGISTRY_PORT>.
Loading code...
Note
Note: the above command is consuming (-C flag) from the topic example.defaultdb.users (-t flag) in Apache Avro format (-s flag). The topic name is determined by the concatenation of the connector parameter topic.prefix (example) and the table name (defaultdb.users).After executing the kcat command, the data in Apache Kafka should be similar to the following:
Loading code...
Not only do we get the three values stored in the users table, but we also get additional useful metadata like when the insert/update/deletion happened (in the ts_ms parameter) and the position in the binlog file.
If we now add a new value in MySQL users table with:
Loading code...
We should see it immediately pop up in the Apache Kafka topic:
Loading code...
If we now remove the row containing Carlo with:
Loading code...
We see the corresponding delete ("op": "d") in Apache Kafka:
Loading code...
Where is the MySQL schema stored? There are two topics:
ddl_history (named by the database.history.kafka.topic parameter) which stores the schema for Debezium's inner workings so that Debezium can resume work if it crashesexample (based on the topic.prefix parameter) where the connector stores all the DDLs needed to replicate the state in another MySQL database.We can fetch the latter topic with kcat:
Loading code...
We get the full list of DDLs needed to replicate the state into a new MySQL database.
Loading code...
Warning
The response also contains database truncation statements which might delete other tables present in the same target database.If we now add a column to the source MySQL database with:
Loading code...
We get a new entry in the example topic defining the DDL change we just implemented:
Loading code...
The Debezium connector for Apache Kafka tracks, in real time, all the changes happening in one or more MySQL tables into a series of topics by reading the binlog. The messages in Apache Kafka contain the update data and are also enriched with metadata information like the transaction ids and timestamp. Furthermore, the usage of a dedicated topic to track DDL changes enables additional control over the table structures useful to replicate the database structure when needed.
You can dig more with the following resources:
avn user loginavn service create demo-mysql-source \
--service-type mysql \
--plan free-1-5gb \
--cloud aws-eu-west-1avn service create demo-kafka \
--service-type kafka \
--plan business-4 \
--cloud aws-eu-west-1 \
-c kafka_connect=true \
-c schema_registry=true \
-c kafka.auto_create_topics_enable=trueavn service wait demo-mysql-source
avn service wait demo-kafkaavn service get demo-mysql-source --format '{service_uri_params}'mysql --user <MYSQL_USERNAME> \
--password=<MYSQL_PASSWORD> \
--host <MYSQL_HOSTNAME> \
--port <MYSQL_PORT> \
<MYSQL_DATABASE_NAME>create table users (id serial primary key, username varchar(100));
insert into users (username) values ('Francesco'),('Ana'),('Floor');select * from users;+----+-----------+
| id | username |
+----+-----------+
| 1 | Francesco |
| 2 | Ana |
| 3 | Floor |
+----+-----------+avn service get demo-mysql-source --format '{service_uri_params}'avn service get demo-kafka --json | jq '.connection_info.schema_registry_uri'avn service get demo-kafka --format '{service_uri}'{
"name":"<CONNECTOR_NAME>",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "<MYSQL_HOST>",
"database.port": "<MYSQL_PORT>",
"database.user": "<MYSQL_USER>",
"database.password": "<MYSQL_PASSWORD>",
"database.dbname": "<MYSQL_DATABASE_NAME>",
"database.ssl.mode": "<MYSQL_SSL_MODE>",
"database.server.id": "<UNIQUE_ID>",
"topic.prefix": "<KAFKA_TOPIC_PREFIX>",
"table.include.list": "<MYSQL_TABLES>",
"tasks.max":"<NR_TASKS>",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>",
"schema.history.internal.kafka.topic": "<HISTORY_TOPIC_NAME>",
"schema.history.internal.kafka.bootstrap.servers": "<APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>",
"schema.history.internal.producer.security.protocol": "SSL",
"schema.history.internal.producer.ssl.keystore.type": "PKCS12",
"schema.history.internal.producer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"schema.history.internal.producer.ssl.keystore.password": "password",
"schema.history.internal.producer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"schema.history.internal.producer.ssl.truststore.password": "password",
"schema.history.internal.producer.ssl.key.password": "password",
"schema.history.internal.consumer.security.protocol": "SSL",
"schema.history.internal.consumer.ssl.keystore.type": "PKCS12",
"schema.history.internal.consumer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"schema.history.internal.consumer.ssl.keystore.password": "password",
"schema.history.internal.consumer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"schema.history.internal.consumer.ssl.truststore.password": "password",
"schema.history.internal.consumer.ssl.key.password": "password",
"include.schema.changes": "true"
}avn service connector create demo-kafka @mysql_source_deb_connector.jsonavn service connector status demo-kafka mysql_source_deb_connector{
"status": {
"state": "RUNNING",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"trace": ""
}
]
}
}avn service connection-info kcat demo-kafka -u avnadmin -Wkcat -b <KAFKA_HOST>:<KAFKA_PORT> \
-X security.protocol=SSL \
-X ssl.ca.location=ca.pem \
-X ssl.key.location=service.key \
-X ssl.certificate.location=service.crt \
-C -t example.defaultdb.users \
-s avro \
-r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>{"id": 1}{"before": null, "after": {"Value": {"id": 1, "username": {"string": "Francesco"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "first"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275527}, "transaction": null}
{"id": 2}{"before": null, "after": {"Value": {"id": 2, "username": {"string": "Ana"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275530}, "transaction": null}
{"id": 3}{"before": null, "after": {"Value": {"id": 3, "username": {"string": "Floor"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "last"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275531}, "transaction": null}
% Reached end of topic example.defaultdb.users [0] at offset 3insert into users(username) values ('Carlo');{"id": 4}{"before": null, "after": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226538000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:39"}, "file": "binlog.000290", "pos": 420, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "c", "ts_ms": {"long": 1733226538394}, "transaction": null}
% Reached end of topic example.defaultdb.users [0] at offset 4delete from users where username='Carlo';{"id": 4}{"before": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "after": null, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226572000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:40"}, "file": "binlog.000290", "pos": 725, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "d", "ts_ms": {"long": 1733226572691}, "transaction": null}
{"id": 4}
% Reached end of topic example.defaultdb.users [0] at offset 6kcat -b <KAFKA_HOST>:<KAFKA_PORT> \
-X security.protocol=SSL \
-X ssl.ca.location=ca.pem \
-X ssl.key.location=service.key \
-X ssl.certificate.location=service.crt \
-C -t example \
-s avro \
-r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT> {"databaseName": ""}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226274272, "snapshot": {"string": "true"}, "db": "", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275042, "databaseName": {"string": ""}, "schemaName": null, "ddl": {"string": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci"}, "tableChanges": []}
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275044, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275086, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP TABLE IF EXISTS `defaultdb`.`users`"}, "tableChanges": [{"type": "DROP", "id": "\"defaultdb\".\"users\"", "table": null}]}
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275092, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275098, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP DATABASE IF EXISTS `defaultdb`"}, "tableChanges": []}
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275098, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275107, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE DATABASE `defaultdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci"}, "tableChanges": []}
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275107, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275109, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "USE `defaultdb`"}, "tableChanges": []}
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275124, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275197, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE TABLE \"users\" (\n \"id\" bigint unsigned NOT NULL AUTO_INCREMENT,\n \"username\" varchar(100) DEFAULT NULL,\n PRIMARY KEY (\"id\"),\n UNIQUE KEY \"id\" (\"id\")\n)"}, "tableChanges": [{"type": "CREATE", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]}
% Reached end of topic example [0] at offset 6alter table users add column country varchar(3);{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226759134, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:41"}, "file": "binlog.000291", "pos": 274, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226759153, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "alter table users add column country varchar(3)"}, "tableChanges": [{"type": "ALTER", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "country", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 3}, "scale": null, "position": 3, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]}
% Reached end of topic example [0] at offset 7