Enabling change data capture from MySQL to Apache Kafka® with Debezium

Implement a change data capture workflow in Apache Kafka®, a key component of any organization with high data integrity requirements.

Change Data Capture (CDC) is the process of tracking the changes happening in one or more database table in a source system and propagating them to a target technology. The objective of this tutorial is to create a CDC flow from a source table in a MySQL database to Apache Kafka® via the Kafka Connect Debezium Source connector.

Change Data Capture from MySQL to Apache Kafka via Kafka Connect

In order to setup a change data capture process tracking the changes from a MySQL database to Apache Kafka, we can use Kafka Connect, a framework which integrates Apache Kafka with external systems.

There are two types of Kafka Connect connectors we can use to set up a CDC process between MySQL and Apache Kafka:

  • The JDBC Source connector, a query based approach which periodically pulls the latest changes from MySQL into Kafka.
  • The Debezium Source connector, which reads from a MySQL binary log (binlog) and streams the changes to Apache Kafka.

While both connectors can move MySQL data into Apache Kafka, the Debezium connector tracks the changes in streaming mode (not in a query based batch mode). This allows us to enhance the extracted data with useful metadata like transaction, IDs and timestamps, and pre-post update details.

The following tutorial guides you through setting up a CDC process using the Debezium connector.

Environment setup

The first step into setting up a CDC process is to create the source MySQL database and the target Apache Kafka cluster with Kafka Connect enabled. We'll use Aiven for MySQL and Aiven for Apache Kafka® to do this.

  1. Create an Aiven account
  2. Install the Aiven Command Line Interface (CLI)
  3. Login into the Aiven CLI using:
avn user login

Create a Aiven for MySQL service named demo-mysql-source:

avn service create demo-mysql-source \ --service-type mysql \ --plan free-1-5gb \ --cloud aws-eu-west-1

Create an Aiven for Apache Kafka service named demo-kafka with Kafka Connect and Schema Registry enabled:

avn service create demo-kafka \ --service-type kafka \ --plan business-4 \ --cloud aws-eu-west-1 \ -c kafka_connect=true \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=true

Wait for the services to start with:

avn service wait demo-mysql-source avn service wait demo-kafka

Create and populate the MySQL table

Once the services are up and running, we can create the table in MySQL in which we'll track the changes. To create and populate the table:

Get the Aiven for MySQL connection details:

avn service get demo-mysql-source --format '{service_uri_params}'

Use the mysql command to connect to the database. Replace the placeholders for <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> with the values taken from the command above. You can find the pre-filled mysql command from the Quick Connect button in the Aiven Console, service overview tab:

mysql --user <MYSQL_USERNAME> \ --password=<MYSQL_PASSWORD> \ --host <MYSQL_HOSTNAME> \ --port <MYSQL_PORT> \ <MYSQL_DATABASE_NAME>

Note

You can install the mysql command line tool using the MySQL installation guide.

Create a table called users with a numeric id column, a username string and populate with three rows for Francesco, Ana, and Floor

create table users (id serial primary key, username varchar(100)); insert into users (username) values ('Francesco'),('Ana'),('Floor');

Verify that the above commands are successful by checking the users table with:

select * from users;

The result should be similar to the below:

+----+-----------+ | id | username | +----+-----------+ | 1 | Francesco | | 2 | Ana | | 3 | Floor | +----+-----------+

Create a Kafka Connect Connector

To create the CDC pipeline, we need to define the settings of the Kafka Connect Debezium Source connector. These settings contain:

  • The source MySQL connection parameters, which allow the connector to fetch the data
  • The target Apache Kafka connection parameters, which allow the connector to store the versioning of the table structure. This step is optional, but it might be useful to track DDL changes to be able to replicate changes to a target MySQL database.
  • The target Apache Kafka schema registry connection parameters, which allow the connector to define schemas and store the data in Apache Avro™ format.

Get the connection parameters

As done in the section above, we can use the avn service get command to retrieve the Aiven for MySQL parameters:

avn service get demo-mysql-source --format '{service_uri_params}'

We can do the same to retrieve the Apache Kafka Schema Registry parameters needed for the schema integration. You can fetch the connection details with:

avn service get demo-kafka --json | jq '.connection_info.schema_registry_uri'

Note

Note: the above command uses jq to parse the avn command output and retrieve the connection URI which will be in the format https://<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>@<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>. You'll need to parse the information to fill the Kafka Connect connector configuration file in the following section.

To retrieve the Apache Kafka endpoint, which we'll use to track the DDL changes in a topic:

avn service get demo-kafka --format '{service_uri}'

Define the Debezium source connector configuration file

In a file named mysql_source_deb_connector.json add the following JSON configuration to define the Debezium connector configuration:

{ "name":"mysql_source_deb_connector", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<MYSQL_HOST>", "database.port": "<MYSQL_PORT>", "database.user": "<MYSQL_USER>", "database.password": "<MYSQL_PASSWORD>", "database.dbname": "<MYSQL_DATABASE_NAME>", "database.sslmode": "<MYSQL_SSL_MODE>", "database.server.name": "mysql_source", "table.include.list": "defaultdb.users", "tasks.max":"1", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "key.converter.basic.auth.credentials.source": "USER_INFO", "key.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "database.history.kafka.topic": "ddl_history", "database.history.kafka.bootstrap.servers": "<APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>", "database.history.producer.security.protocol": "SSL", "database.history.producer.ssl.keystore.type": "PKCS12", "database.history.producer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "database.history.producer.ssl.keystore.password": "password", "database.history.producer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "database.history.producer.ssl.truststore.password": "password", "database.history.producer.ssl.key.password": "password", "database.history.consumer.security.protocol": "SSL", "database.history.consumer.ssl.keystore.type": "PKCS12", "database.history.consumer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "database.history.consumer.ssl.keystore.password": "password", "database.history.consumer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "database.history.consumer.ssl.truststore.password": "password", "database.history.consumer.ssl.key.password": "password", "include.schema.changes": "true" }

We also need to replace the following parameters:

  • The MySQL connection details: <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> and <MYSQL_SSL_MODE>
  • The Apache Kafka schema registry details: <APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT> and <SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>
  • The Apache Kafka details: <APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>

Start the Debezium source connector

Once the parameters are substituted, we can start the Debezium source connector with:

avn service connector create demo-kafka @mysql_source_deb_connector.json

The above starts a connector named mysql_source_deb_connector.

We can check its status with:

avn service connector status demo-kafka mysql_source_deb_connector

The status should be RUNNING.

{ "status": { "state": "RUNNING", "tasks": [ { "id": 0, "state": "RUNNING", "trace": "" } ] } }

Check the data in Apache Kafka with kcat

If the connector is up and running, we can check that the data is flowing from MySQL to Apache Kafka with kcat. Get the kcat parameters and download the certificates needed to connect with:

avn service connection-info kcat demo-kafka -u avnadmin -W

The above command will store the three certificates (ca.pem, service.crt and service.key) in the current local folder and return the command to execute.

To verify the data in Apache Kafka we can use the following command, substituting the placeholders for <KAFKA_HOST>, <KAFKA_PORT>, <KAFKA_SCHEMA_REGISTRY_USR>, <KAFKA_SCHEMA_REGISTRY_PWD>, <KAFKA_HOST>, and <KAFKA_SCHEMA_REGISTRY_PORT>.

kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t mysql_source.defaultdb.users \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>

Note

Note: the above command is consuming (-C flag) from the topic mysql_source.defaultdb.users (-t flag) in Apache Avro format (-s flag). The topic name is driven by the concatenation of the database.server.name connector parameter and the schema and table name.

After executing the kcat command, the data in Apache Kafka should be similar to the following:

{"id": 1}{"before": null, "after": {"Value": {"id": 1, "username": {"string": "Francesco"}}}, "source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198113000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1690198113195}, "transaction": null} {"id": 2}{"before": null, "after": {"Value": {"id": 2, "username": {"string": "Ana"}}}, "source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198113000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1690198113205}, "transaction": null} {"id": 3}{"before": null, "after": {"Value": {"id": 3, "username": {"string": "Floor"}}}, "source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198113000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1690198113205}, "transaction": null}

Not only do we get the three values stored in the users table, but we also get additional useful metadata like when the insert/update/deletion happened (in the ts_ms parameter) and the position in the binlog file.

If we now add a new value in MySQL users table with:

insert into users(username) values ('Carlo');

We should see it immediately pop up in the Apache Kafka topic:

{"id": 4}{"before": null, "after": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198113000, "snapshot": {"string": "last"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": {"long": 1690198113206}, "transaction": null}

If we now remove the row containing Carlo with:

delete from users where username='Carlo';

We see the corresponding delete ("op": "d") in Apache Kafka:

{"id": 4}{"before": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "after": null, "source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198657000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 3397386443, "gtid": {"string": "c6619363-2a0a-11ee-b45f-06c229911f02:31"}, "file": "binlog.000019", "pos": 420, "row": 0, "thread": {"long": 1049}, "query": null}, "op": "d", "ts_ms": {"long": 1690198657182}, "transaction": null} {"id": 4}

Altering the schema

Where is the MySQL schema stored? There's two topics, one called ddl_history (based on the database.history.kafka.topic parameter) for the Debezium connector internal workings, the other named mysql_source (based on the database.server.name parameter) where the connector stores all the DDLs needed to replicate the state in another MySQL database. ddl_history stores the schema for Debezium's inner workings so that Debezium can resume work if it crashes, and mysql_source stores all the DDLs needed to replicate the state in another mySQL database.

If we fetch the latter topic with kcat:

kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t mysql_source \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>

We get the full list of DDLs needed to replicate the state into a new MySQL database.

{"databaseName": ""}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198111962, "snapshot": {"string": "true"}, "db": "", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "databaseName": {"string": ""}, "schemaName": null, "ddl": {"string": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198112701, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP TABLE IF EXISTS `defaultdb`.`users`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198112732, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP DATABASE IF EXISTS `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198112744, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE DATABASE `defaultdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198112752, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "USE `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690198112757, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000017", "pos": 502, "row": 0, "thread": null, "query": null}, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE TABLE \"users\" (\n \"id\" bigint unsigned NOT NULL AUTO_INCREMENT,\n \"username\" varchar(100) DEFAULT NULL,\n PRIMARY KEY (\"id\"),\n UNIQUE KEY \"id\" (\"id\")\n)"}, "tableChanges": [{"type": "CREATE", "id": "\"defaultdb\".\"users\"", "table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null}], "comment": null}}]}

Warning

The response also contains database truncation statements which might delete other tables present in the same target database.

If we now add a column to the source MySQL database with:

alter table users add column country varchar(3);

We get a new entry in the mysql_source topic defining the DDL change we just implemented:

{"databaseName": "defaultdb"}{"source": {"version": "1.9.7.aiven", "connector": "mysql", "name": "mysql_source", "ts_ms": 1690199104377, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 3397386443, "gtid": {"string": "c6619363-2a0a-11ee-b45f-06c229911f02:32"}, "file": "binlog.000021", "pos": 274, "row": 0, "thread": null, "query": null}, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "alter table users add column country varchar(3)"}, "tableChanges": [{"type": "ALTER", "id": "\"defaultdb\".\"users\"", "table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null}, {"name": "country", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 3}, "scale": null, "position": 3, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null}], "comment": null}}]}

Conclusion

The Debezium connector for Apache Kafka tracks, in real time, all the changes happening in one or more MySQL tables into a series of topics by reading the binlog. The messages in Apache Kafka contain the update data and are also enriched with metadata information like the transaction ids and timestamp. Furthermore, the usage of a dedicated topic to track DDL changes enables additional control over the table structures useful to replicate the database structure when needed.

You can dig more with the following resources: