Database migration with Apache Kafka® and Apache Kafka® Connect

Find out how to use Apache Kafka® to migrate across database technologies while keeping the target continually in sync with the source.


This article has been updated (July 2022) to use pgoutput instead ofwal2json, as wal2json will be deprecated in Debezium 2.0.

Technologies change all the time, and the best platform choice at the start of the project sometimes does not look so great after time has passed or requirements have become clearer. This is true for databases as well, but managing transitions in this area of your application can be disruptive if there are multiple systems involved.

The ideal situation is to enable both the old and new database platforms to be in use at once, and kept perfectly in sync. This allows engineering teams to perform safe migrations without all needing to be in lockstep with one another. As it happens, this sort of linking technology exists, and its name is Apache Kafka® Connect.

In this blog post we'll explore the usage of Apache Kafka® and Apache Kafka® Connect as streaming bridge between different database technologies, and how the use of the Debezium Connector enables tracking changes as soon as they happen and propagate them to a Kafka topic.

The big picture


Here's a useful read if your database migration tool questions are still in their early stages.

For the aim of this blog post, we define a scenario where the original database is PostgreSQL® and the target is MySQL and we'll create a streaming replica from source to target. To achieve it, we need a data streaming platform, and Apache Kafka gives us the ability to ingest and consume data in near real time with a platform proven at scale.

The missing piece is the connection between the various components, but the good news is that we don't have to bring in additional tools: Apache Kafka Connect, part of the Apache Kafka ecosystem, enables us to just define data sources and targets making the data extraction and load a matter of a couple of config files.

Apache Kafka Connect offers a huge variety of connectors enabling integrations between databases, data stores, http endpoints, analytics tools. The complete list of Aiven's supported managed connectors is available in our dedicated documentation.

When dealing with a source relational database, Apache Kafka Connect provides a couple of options: we could use the JDBC approach as explained in a previous post, querying the source database on a polling interval. If, on the other side, we want to embrace the event-driven paradigm, by detecting and streaming every change as soon as it happens, we need to take onboard a proper Change Data Capture solution like Debezium.

Diagram showing the components: PostgreSQL, Apache Kafka, Kafka Connect, Debezium and MySQL

Create the environments

The complete set of technologies is fully open-source. For the sake of simplicity I'll use the Aiven managed services and deploy the required services with help of the Aiven CLI. Let's start by creating an Aiven for Apache Kafka service named demo-kafka in the google-europe-west3 region with the juicy business-4 plan. During service creation we can enable Apache Kafka connect and the schema registry functionality provided by Aiven's Karapace and the automatic topic creation.

avn service create demo-kafka \ --service-type kafka \ --cloud google-europe-west3 \ --plan business-4 \ -c kafka_connect=true \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=true

Creating the PostgreSQL database follows a similar pattern, this time the name is demo-pg (I'm low in creativity today), located in the same region as the Apache Kafka service to minimize the latency and using a smaller startup-4 plan.

avn service create demo-pg \ --service-type pg \ --cloud google-europe-west3 \ --plan startup-4

The final piece is represented by the target database, MySQL in our scenario:

avn service create demo-mysql \ --service-type mysql \ --cloud google-europe-west3 \ --plan business-4

Let's now grab a quick espresso, we have a couple of minutes of waiting time before the services are created... The long waiting times for servers provisioning are gone. We can also check the service creation progress and get prompted when it's finished using the Aiven CLI wait command.

Create a dataset in PostgreSQL

Once our three services are in running state, we define our data playground by creating a small table in PostgreSQL containing data about a thing where precision and consistency across technologies matters: pasta and cooking minutes!

We can connect to the demo-pg PostgreSQL database using the dedicated command:

avn service cli demo-pg

The utility fetches the connection parameters and uses psql to connect. Once in the database, we create the pasta table and insert a few rows with the following command:

create table pasta (id serial, name varchar primary key, cooking_minutes int); alter table pasta replica identity full; insert into pasta (name, cooking_minutes) values ('spaghetti', 8); insert into pasta (name, cooking_minutes) values ('pennette', 7); insert into pasta (name, cooking_minutes) values ('linguine', 10); insert into pasta (name, cooking_minutes) values ('farfalle', 9);

Capture changes with the Debezium source connector

We covered the JDBC source connector in a previous blog post, but, as explained above, the JDBC route will only query the database on polling intervals, possibly adding a delay in the event capture. The alternative is represented by the Debezium source connector for PostgreSQL which extracts the changes committed to the transaction log and provides them in a standard format into an Apache Kafka topic.

To set it up we need to define a configuration file named connector_pg_source.json with the following content

{ "name": "cdc-source-pg", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "[PG_HOSTNAME]", "database.port": "[PG_PORT]", "database.user": "[PG_USER]", "database.password": "[PG_PASSWORD]", "database.dbname": "[PG_DB_NAME]", "database.sslmode": "[PG_SSL_MODE]", "": "pgoutput", "": "test_slot", "": "test_pub", "": "my_pg_source", "table.include.list": "public.pasta", "tombstones.on.delete": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]", "key.converter.basic.auth.credentials.source": "USER_INFO", "": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]", "value.converter.basic.auth.credentials.source": "USER_INFO", "": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]" }

The configuration file states that we are going to create a Debezium source connector named cdc-source-pg, with the pgoutput PostgreSQL output plugin, using a logical replication publication named test_pub and a replication slot named test_slot. Moreover we are using my_pg_source as server name driving the topic prefix and include the table public.pasta in the replica.

Finally we create tombstone messages for deletions (tombstones.on.delete) and we convert keys and values to Avro since, compared to JSON, it is far lighter on the network and allows us to have the events schema defined, which will be handy once we push the data. You can check each parameter definition in the dedicated documentation.

To make it work we need to substitute a few placeholders:

  • [PG_HOSTNAME], [PG_PORT], [PG_USER], [PG_PASSWORD], [PG_DB_NAME], [PG_SSL_MODE]: these are the PostgreSQL connection parameters, and, if you're using Aiven, are available with:
avn service get demo-pg --format '{service_uri_params}'
  • [APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]: We are using Aiven's Karapace schema registry to store Avro messages' schemas. To retrieve the connection URI execute the following command:
avn service get demo-kafka --json | jq '.connection_info.schema_registry_uri'

The schema registry URI contains both the url and the credentials in the form http://avnadmin:kafka_schema_registry_password@kafka_host:schema_registry_port. We can therefore also fill the [SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD] placeholders accordingly.

We can now start the change data capture flow with the help of the Aiven CLI in our terminal:

avn service connector create demo-kafka @connector_pg_source.json

Avoid the "must be superuser" error

When creating the connector, you might encounter the error must be superuser to create FOR ALL TABLES publication. This is because the connector tries to create a publication for all tables. In order to solve the problem you can create the publication directly in PostgreSQL by:

  • installing the aiven_extras extension

  • create the publication for the public.pasta table

    SELECT * FROM aiven_extras.pg_create_publication( 'test_pub', 'INSERT,UPDATE,DELETE', 'public.pasta' );

Once the publication is created, the connector should work once restarted.

Check the data in Apache Kafka

The Debezium source connector creates a topic named my_pg_source.public.pasta (the concatenation of the and the schema and table name parameters). To inspect it, we can use kcat, more details on it's configuration can be found in the dedicated documentation.

Once SSL keys have been downloaded and the kcat configuration file is set, we can check the data in Apache Kafka with the following command in a new terminal window:

kcat -F kcat.config \ -C -t my_pg_source.public.pasta \ -s avro \ -r https://[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]@[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT] \ -f 'Key: %k\nValue: %s\n'

The above calls kcat with the following parameters:

  • -F kcat.config: using the kcat.config file for connection details
  • -C -t my_pg_source.public.pasta: kcat is called in consumer mode (-C) reading from the my_pg_source.public.pasta topic
  • -s avro: the expected topic data forma is AVRO
  • -r https://[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]@[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]: Aiven's Karapace schema registry connection URI including username and password. Replace the placeholders with the correct values fetched above.
  • -f 'Key: %k\nValue: %s\n': output format, for each event displays the key (%k) and value (%s)

The output of the kcat call is the following:

% Reading configuration from file kcat.config Key: {"name": "spaghetti"} Value: {"before": null, "after": {"Value": {"id": 1, "name": "spaghetti", "cooking_minutes": {"int": 8}}}, "source": {"version": "1.7.0.Final", "connector": "postgresql", "name": "my_pg_source", "ts_ms": 1639385467887, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": {"string": "[null,\"84042304\"]"}, "schema": "public", "table": "pasta", "txId": {"long": 529}, "lsn": {"long": 84042304}, "xmin": null}, "op": "r", "ts_ms": {"long": 1639385467890}, "transaction": null} Key: {"name": "pennette"} Value: {"before": null, "after": {"Value": {"id": 2, "name": "pennette", "cooking_minutes": {"int": 7}}}, "source": {"version": "1.7.0.Final", "connector": "postgresql", "name": "my_pg_source", "ts_ms": 1639385467899, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": {"string": "[null,\"84042304\"]"}, "schema": "public", "table": "pasta", "txId": {"long": 529}, "lsn": {"long": 84042304}, "xmin": null}, "op": "r", "ts_ms": {"long": 1639385467900}, "transaction": null} Key: {"name": "linguine"} Value: {"before": null, "after": {"Value": {"id": 3, "name": "linguine", "cooking_minutes": {"int": 10}}}, "source": {"version": "1.7.0.Final", "connector": "postgresql", "name": "my_pg_source", "ts_ms": 1639385467901, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": {"string": "[null,\"84042304\"]"}, "schema": "public", "table": "pasta", "txId": {"long": 529}, "lsn": {"long": 84042304}, "xmin": null}, "op": "r", "ts_ms": {"long": 1639385467902}, "transaction": null} Key: {"name": "farfalle"} Value: {"before": null, "after": {"Value": {"id": 4, "name": "farfalle", "cooking_minutes": {"int": 9}}}, "source": {"version": "1.7.0.Final", "connector": "postgresql", "name": "my_pg_source", "ts_ms": 1639385467903, "snapshot": {"string": "last"}, "db": "defaultdb", "sequence": {"string": "[null,\"84042304\"]"}, "schema": "public", "table": "pasta", "txId": {"long": 529}, "lsn": {"long": 84042304}, "xmin": null}, "op": "r", "ts_ms": {"long": 1639385467904}, "transaction": null} % Reached end of topic my_pg_source.public.pasta [0] at offset 4

During the first run, the Debezium connector creates a new message in the Apache Kafka topic for each row present in the pasta table. For each row we can see the current value (in the after section of the JSON document) together with additional information regarding timestamps and transaction ids. It's worth mentioning that the rows updated/deleted before we started the change data capture are not detected since Debezium doesn't go back in time. But, if now we want our pennette to be a bit more "al dente", and update the row in the PostgreSQL database with:

update pasta set cooking_minutes=6 where name='pennette';

We can see the resulting update in kcat as:

Key: {"name": "pennette"} Value: {"before": {"Value": {"id": 2, "name": "pennette", "cooking_minutes": {"int": 7}}}, "after": {"Value": {"id": 2, "name": "pennette", "cooking_minutes": {"int": 6}}}, "source": {"version": "1.7.0.Final", "connector": "postgresql", "name": "my_pg_source", "ts_ms": 1639387523428, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": {"string": "[null,\"201327488\"]"}, "schema": "public", "table": "pasta", "txId": {"long": 740}, "lsn": {"long": 201327488}, "xmin": null}, "op": "u", "ts_ms": {"long": 1639387523873}, "transaction": null}

In case of updates ("op": "u"), the Debezium source connector provides the situation as before the change ("before": {"Value": {"id": 2, "name": "pennette", "cooking_minutes": {"int": 7}}}) and after it ("after": {"Value": {"id": 2, "name": "pennette", "cooking_minutes": {"int": 6}}}).

Now that the data is in a Kafka topic, we can serve to one or many downstream data sinks as it is, or, if we want to process and manipulate the data in real time, we could define some stream processing data pipelines using Aiven for Apache Flink.

Sink data to MySQL

Let's stick to the original plan: a simple PostgreSQL -> Apache Kafka -> MySQL copy of the data. To achieve the last segment, we can create another Kafka Connect connector, this time a JDBC sink, by defining its properties in another configuration file (we'll refer to the file as connector_sink_mysql.json) with the following content:

{ "name": "cdc-sink-mysql", "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector", "topics": "my_pg_source.public.pasta", "connection.url": "jdbc:mysql://[MYSQL_HOSTNAME]:[MYSQL_PORT]/[MYSQL_DB_NAME]?ssl-mode=REQUIRED", "connection.user": "[MYSQL_USER]", "connection.password": "[MYSQL_PASSWORD]", "insert.mode": "upsert", "": "pasta_mysql", "pk.mode": "record_key", "pk.fields": "name", "auto.create": "true", "transforms": "newrecordstate", "transforms.newrecordstate.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.newrecordstate.drop.tombstones": "false", "transforms.newrecordstate.delete.handling.mode": "rewrite", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]", "key.converter.basic.auth.credentials.source": "USER_INFO", "": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]", "value.converter.basic.auth.credentials.source": "USER_INFO", "": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]" }

The configuration file states that we are going to create a JDBC sink connector named cdc-sink-mysql, reading from the topic my_pg_source.public.pasta and pushing data to a MySQL database table called pasta_mysql in upsert mode. The primary key will be based on a field of the record_key called name.
Moreover we are enabling the target table automatic creation if it doesn't exist (auto.create), extracting the new record state (ExtractNewRecordState) and propagating record deletions as soft deletes ("delete.handling.mode":"rewrite"). To read more about Debezium related message transformation check the dedicated documentation.

Finally, since our messages are in Avro format, we define again the key.converter and value.converter to extract the related schemas from Aiven Karapace and decode the messages.

Again, to take the connector into action, we need to replace some placeholders:

  • [MYSQL_HOSTNAME], [MYSQL_PORT], [MYSQL_DB_NAME], [MYSQL_USER], [MYSQL_PASSWORD]: these are the MySQL connection parameters, and, if you're using Aiven, are be available with:
avn service get demo-mysql --format '{service_uri}'
  • [APACHE_KAFKA_HOST], [SCHEMA_REGISTRY_PORT], [SCHEMA_REGISTRY_USER], [SCHEMA_REGISTRY_PASSWORD]: we can reuse the same Karapace schema registry connection parameters fetched above.

We are ready to create the sink connector with the Aiven CLI with:

avn service connector create demo-kafka @connector_sink_mysql.json

Check the data in MySQL

After creating the connector, we should see the pasta_mysql table created in the MySQL target database. To check things out we can connect using the mysql client from a new terminal window reusing the same connection parameters we got before:


After connecting, we can check the data in the pasta_mysql table with:

select * from pasta_mysql;

And we can see the updated data as per the PostgreSQL table:

+------+-----------+-----------------+-----------+ | id | name | cooking_minutes | __deleted | +------+-----------+-----------------+-----------+ | 4 | farfalle | 9 | false | | 3 | linguine | 10 | false | | 2 | pennette | 6 | false | | 1 | spaghetti | 8 | false | +------+-----------+-----------------+-----------+ 4 rows in set (0.02 sec)

Now, if we do a couple of changes on the PostgreSQL side, changing the format from spaghetti to spaghettini and removing the linguine row.

update pasta set name='spaghettini' where name='spaghetti'; delete from pasta where name='linguine';

We can check that the same changes are immediately applied on the MySQL side: both the spaghetti and linguine are now soft deleted (__deleted equal true) and the new entry spaghettini is correctly stored.

+------+-------------+-----------------+-----------+ | id | name | cooking_minutes | __deleted | +------+-------------+-----------------+-----------+ | 4 | farfalle | 9 | false | | 3 | linguine | 10 | true | | 2 | pennette | 6 | false | | 1 | spaghetti | 8 | true | | 1 | spaghettini | 8 | false | +------+-------------+-----------------+-----------+

Keep data in sync to support technical change

Apache Kafka and Apache Kafka Connect provide integrations and flexibility; a way to bridge technologies keeping data in sync by detecting changes and propagating them in near real time. Beyond that, by making data flow through Apache Kafka we can also create multiple downstream data pipelines into different technologies, by adding more Kafka Connect connectors without impacting the source database.

By using the Debezium Kafka Connect connector, you can track every modification performed in the source database, and store and propagate additional information such as the pre and post data points, change type, and related timestamps. This extra metadata really improves and empowers our change data capture process.

Try these resources to get started and learn more: