Skip to content

Aiven Logo
  • Pricing
  • Blog

Log in

Book a demo

Start for free

Developer Center
  1. Aiven Developer Center
  2. Databases

Enabling change data capture from MySQL to Apache Kafka® with Debezium

Implement a change data capture workflow in Apache Kafka®, a key component of any organization with high data integrity requirements.

  • MySQL
  • Apache Kafka®
  • Tutorial
  • Data
Subscribe to RSS
Loading...

Subscribe to RSS

Change Data Capture (CDC) is the process of tracking the changes happening in one or more database table in a source system and propagating them to a target technology. The objective of this tutorial is to create a CDC flow from a source table in a MySQL database to Apache Kafka® via the Kafka Connect Debezium Source connector.

Debezium 2.5

This article describes the configuration for Debezium version 2.5 and later.

Change Data Capture from MySQL to Apache Kafka via Kafka Connect

In order to setup a change data capture process tracking the changes from a MySQL database to Apache Kafka, we can use Kafka Connect, a framework which integrates Apache Kafka with external systems.

There are two types of Kafka Connect connectors we can use to set up a CDC process between MySQL and Apache Kafka:

  • The JDBC Source connector, a query based approach which periodically pulls the latest changes from MySQL into Kafka.
  • The Debezium Source connector, which reads from a MySQL binary log (binlog) and streams the changes to Apache Kafka.

While both connectors can move MySQL data into Apache Kafka, the Debezium connector tracks the changes in streaming mode (not in a query based batch mode). This allows us to enhance the extracted data with useful metadata like transaction, IDs and timestamps, and pre-post update details.

The following tutorial guides you through setting up a CDC process using the Debezium connector.

Environment setup

The first step into setting up a CDC process is to create the source MySQL database and the target Apache Kafka cluster with Kafka Connect enabled. We'll use Aiven for MySQL and Aiven for Apache Kafka® to do this.

  1. Create an Aiven account
  2. Install the Aiven Command Line Interface (CLI)
  3. Login into the Aiven CLI using:
Loading code...

Create a Aiven for MySQL service named demo-mysql-source:

Loading code...

Create an Aiven for Apache Kafka service named demo-kafka with Kafka Connect and Schema Registry enabled:

Loading code...

Wait for the services to start with:

Loading code...

Create and populate the MySQL table

Once the services are up and running, we can create the table in MySQL in which we'll track the changes. To create and populate the table:

Get the Aiven for MySQL connection details:

Loading code...

Use the mysql command to connect to the database. Replace the placeholders for <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> with the values taken from the command above. Alternatively, you can find the pre-filled mysql command from the Quick Connect button in the Aiven Console service overview.

Loading code...

Note

You can install the mysql command line tool using the MySQL installation guide.

Create a table called users with a numeric id column, a username string and populate with three rows for Francesco, Ana, and Floor

Loading code...

Verify that the above commands are successful by checking the users table with:

Loading code...

The result should be similar to the below:

Loading code...

Create a Kafka Connect Connector

To create the CDC pipeline, we need to define the settings of the Kafka Connect Debezium Source connector. These settings contain:

  • The source MySQL connection parameters, which allow the connector to fetch the data
  • The target Apache Kafka connection parameters, which allow the connector to store the versioning of the table structure. This step is optional, but it might be useful to track DDL changes to be able to replicate changes to a target MySQL database.
  • The target Apache Kafka schema registry connection parameters, which allow the connector to define schemas and store the data in Apache Avro™ format.

Get the connection parameters

As done in the section above, we can use the avn service get command to retrieve the Aiven for MySQL parameters:

Loading code...

We can do the same to retrieve the Apache Kafka Schema Registry parameters needed for the schema integration. You can fetch the connection details with:

Loading code...

Note

Note: the above command uses jq to parse the avn command output and retrieve the connection URI which will be in the format https://<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>@<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>. You'll need to parse the information to fill the Kafka Connect connector configuration file in the following section.

To retrieve the Apache Kafka endpoint, which we'll use to track the DDL changes in a topic:

Loading code...

Define the Debezium source connector configuration file

In a file named mysql_source_deb_connector.json add the following JSON configuration to define the Debezium connector configuration:

Loading code...

We need to replace the following values:

  • <CONNECTOR_NAME> is the name we're giving this connector. For this tutorial, set it to mysql_source_deb_connector.
  • The MySQL connection details from the previous section: <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> and <MYSQL_SSL_MODE>
  • <UNIQUE_ID> a number, which must be different for each connector running. For this tutorial, set it to 12345.
  • <KAFKA_TOPIC_PREFIX> will be used as a prefix for all the Kafka topics that this connector sends events to. For this tutorial, set it to example.
  • <MYSQ_TABLES> is the list of MySQL tables to capture changes from. This parameter is optional. The default is the empty string, which means capture all tables. For this tutorial, set it to defaultdb.users, the one table we have. (Note that there's also a parameter called table.exclude.list for the opposite effect.)
  • <NR_TASKS> should be set to 1, since the MySQL connector always uses a single task. This is actually the default value, so the tasks.max parameter can also be omitted.
  • <HISTORY_TOPIC_NAME> is the name of the Apache Kafka topic that contains the history of schema changes. For this tutorial, set it to ddl_history.
  • The Apache Kafka schema registry details from the previous section: <APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT> and <SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>
  • The Apache Kafka details from the previous section: <APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>

See the Debezium MySQL connector documentation for more infomation.

Start the Debezium source connector

Once the parameters are substituted, we can start the Debezium source connector with:

Loading code...

Since we set the name parameter to mysql_source_deb_connector, the above starts a connector with that name.

We can check its status with:

Loading code...

The status should be RUNNING.

Loading code...

Check the data in Apache Kafka with kcat

If the connector is up and running, we can check that the data is flowing from MySQL to Apache Kafka with kcat. Get the kcat parameters and download the certificates needed to connect with:

Loading code...

The above command will store the three certificates (ca.pem, service.crt and service.key) in the current local folder and return the command to execute.

To verify the data in Apache Kafka we can use the following command, substituting the placeholders for <KAFKA_HOST>, <KAFKA_PORT>, <KAFKA_SCHEMA_REGISTRY_USR>, <KAFKA_SCHEMA_REGISTRY_PWD>, <KAFKA_HOST>, and <KAFKA_SCHEMA_REGISTRY_PORT>.

Loading code...

Note

Note: the above command is consuming (-C flag) from the topic example.defaultdb.users (-t flag) in Apache Avro format (-s flag). The topic name is determined by the concatenation of the connector parameter topic.prefix (example) and the table name (defaultdb.users).

After executing the kcat command, the data in Apache Kafka should be similar to the following:

Loading code...

Not only do we get the three values stored in the users table, but we also get additional useful metadata like when the insert/update/deletion happened (in the ts_ms parameter) and the position in the binlog file.

If we now add a new value in MySQL users table with:

Loading code...

We should see it immediately pop up in the Apache Kafka topic:

Loading code...

If we now remove the row containing Carlo with:

Loading code...

We see the corresponding delete ("op": "d") in Apache Kafka:

Loading code...

Altering the schema

Where is the MySQL schema stored? There are two topics:

  • one called ddl_history (named by the database.history.kafka.topic parameter) which stores the schema for Debezium's inner workings so that Debezium can resume work if it crashes
  • the other called example (based on the topic.prefix parameter) where the connector stores all the DDLs needed to replicate the state in another MySQL database.

We can fetch the latter topic with kcat:

Loading code...

We get the full list of DDLs needed to replicate the state into a new MySQL database.

Loading code...

Warning

The response also contains database truncation statements which might delete other tables present in the same target database.

If we now add a column to the source MySQL database with:

Loading code...

We get a new entry in the example topic defining the DDL change we just implemented:

Loading code...

Conclusion

The Debezium connector for Apache Kafka tracks, in real time, all the changes happening in one or more MySQL tables into a series of topics by reading the binlog. The messages in Apache Kafka contain the update data and are also enriched with metadata information like the transaction ids and timestamp. Furthermore, the usage of a dedicated topic to track DDL changes enables additional control over the table structures useful to replicate the database structure when needed.

You can dig more with the following resources:

  • Create a Debezium source connector from MySQL to Apache Kafka® in the Aiven documentation
  • Debezium source connector to MySQL
  • Debezium data type mappings
  • MySQL binlog

Table of contents

  • Change Data Capture from MySQL to Apache Kafka via Kafka Connect
  • Environment setup
  • Create and populate the MySQL table
  • Create a Kafka Connect Connector
  • Get the connection parameters
  • Define the Debezium source connector configuration file
  • Start the Debezium source connector
  • Check the data in Apache Kafka with kcat
  • Altering the schema
  • Conclusion
Aiven Logo at footer
Loading...
  • Github
  • Facebook
  • LinkedIn
  • Twitter
  • Youtube

Company

  • About
  • Open source
  • Careers
  • Sustainability
  • Modern slavery statement
  • Press
  • Blog

Legal

  • Terms
  • SLA
  • AUP
  • Data processing
  • Privacy
  • DSA contact
  • Cookie policy
  • Website terms of use
  • Do not sell or share my personal information

Platform

  • Responsibility matrix
  • Subprocessors
  • Security and compliance
  • Resource library
  • Support services
  • Changelog
  • Aiven status

Contact

  • Contact us
  • Book a demo
  • Support
  • Invoice address
  • Events calendar

Copyright © Aiven 2016-2025. Apache, Apache Kafka, Kafka, Apache Flink, and Flink are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. ClickHouse is a registered trademark of ClickHouse, Inc. https://clickhouse.com. OpenSearch, PostgreSQL, MySQL, Grafana, Dragonfly, Valkey, Thanos, Terraform, and Kubernetes are trademarks and property of their respective owners. All product and service names used in this website are for identification purposes only and do not imply endorsement.

avn user login
avn service create demo-mysql-source \ --service-type mysql \ --plan free-1-5gb \ --cloud aws-eu-west-1
avn service create demo-kafka \ --service-type kafka \ --plan business-4 \ --cloud aws-eu-west-1 \ -c kafka_connect=true \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=true
avn service wait demo-mysql-source avn service wait demo-kafka
avn service get demo-mysql-source --format '{service_uri_params}'
mysql --user <MYSQL_USERNAME> \ --password=<MYSQL_PASSWORD> \ --host <MYSQL_HOSTNAME> \ --port <MYSQL_PORT> \ <MYSQL_DATABASE_NAME>
create table users (id serial primary key, username varchar(100)); insert into users (username) values ('Francesco'),('Ana'),('Floor');
select * from users;
+----+-----------+ | id | username | +----+-----------+ | 1 | Francesco | | 2 | Ana | | 3 | Floor | +----+-----------+
avn service get demo-mysql-source --format '{service_uri_params}'
avn service get demo-kafka --json | jq '.connection_info.schema_registry_uri'
avn service get demo-kafka --format '{service_uri}'
{ "name":"<CONNECTOR_NAME>", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<MYSQL_HOST>", "database.port": "<MYSQL_PORT>", "database.user": "<MYSQL_USER>", "database.password": "<MYSQL_PASSWORD>", "database.dbname": "<MYSQL_DATABASE_NAME>", "database.ssl.mode": "<MYSQL_SSL_MODE>", "database.server.id": "<UNIQUE_ID>", "topic.prefix": "<KAFKA_TOPIC_PREFIX>", "table.include.list": "<MYSQL_TABLES>", "tasks.max":"<NR_TASKS>", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "key.converter.basic.auth.credentials.source": "USER_INFO", "key.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "schema.history.internal.kafka.topic": "<HISTORY_TOPIC_NAME>", "schema.history.internal.kafka.bootstrap.servers": "<APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>", "schema.history.internal.producer.security.protocol": "SSL", "schema.history.internal.producer.ssl.keystore.type": "PKCS12", "schema.history.internal.producer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "schema.history.internal.producer.ssl.keystore.password": "password", "schema.history.internal.producer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "schema.history.internal.producer.ssl.truststore.password": "password", "schema.history.internal.producer.ssl.key.password": "password", "schema.history.internal.consumer.security.protocol": "SSL", "schema.history.internal.consumer.ssl.keystore.type": "PKCS12", "schema.history.internal.consumer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "schema.history.internal.consumer.ssl.keystore.password": "password", "schema.history.internal.consumer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "schema.history.internal.consumer.ssl.truststore.password": "password", "schema.history.internal.consumer.ssl.key.password": "password", "include.schema.changes": "true" }
avn service connector create demo-kafka @mysql_source_deb_connector.json
avn service connector status demo-kafka mysql_source_deb_connector
{ "status": { "state": "RUNNING", "tasks": [ { "id": 0, "state": "RUNNING", "trace": "" } ] } }
avn service connection-info kcat demo-kafka -u avnadmin -W
kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t example.defaultdb.users \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>
{"id": 1}{"before": null, "after": {"Value": {"id": 1, "username": {"string": "Francesco"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "first"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275527}, "transaction": null} {"id": 2}{"before": null, "after": {"Value": {"id": 2, "username": {"string": "Ana"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275530}, "transaction": null} {"id": 3}{"before": null, "after": {"Value": {"id": 3, "username": {"string": "Floor"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "last"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275531}, "transaction": null} % Reached end of topic example.defaultdb.users [0] at offset 3
insert into users(username) values ('Carlo');
{"id": 4}{"before": null, "after": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226538000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:39"}, "file": "binlog.000290", "pos": 420, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "c", "ts_ms": {"long": 1733226538394}, "transaction": null} % Reached end of topic example.defaultdb.users [0] at offset 4
delete from users where username='Carlo';
{"id": 4}{"before": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "after": null, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226572000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:40"}, "file": "binlog.000290", "pos": 725, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "d", "ts_ms": {"long": 1733226572691}, "transaction": null} {"id": 4} % Reached end of topic example.defaultdb.users [0] at offset 6
kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t example \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>
{"databaseName": ""}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226274272, "snapshot": {"string": "true"}, "db": "", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275042, "databaseName": {"string": ""}, "schemaName": null, "ddl": {"string": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275044, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275086, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP TABLE IF EXISTS `defaultdb`.`users`"}, "tableChanges": [{"type": "DROP", "id": "\"defaultdb\".\"users\"", "table": null}]} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275092, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275098, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP DATABASE IF EXISTS `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275098, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275107, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE DATABASE `defaultdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275107, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275109, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "USE `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275124, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275197, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE TABLE \"users\" (\n \"id\" bigint unsigned NOT NULL AUTO_INCREMENT,\n \"username\" varchar(100) DEFAULT NULL,\n PRIMARY KEY (\"id\"),\n UNIQUE KEY \"id\" (\"id\")\n)"}, "tableChanges": [{"type": "CREATE", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]} % Reached end of topic example [0] at offset 6
alter table users add column country varchar(3);
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226759134, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:41"}, "file": "binlog.000291", "pos": 274, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226759153, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "alter table users add column country varchar(3)"}, "tableChanges": [{"type": "ALTER", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "country", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 3}, "scale": null, "position": 3, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]} % Reached end of topic example [0] at offset 7