Create a JDBC sink connector from Apache Kafka® to another database
The JDBC (Java Database Connectivity) sink connector enables you to move data from an Aiven for Apache Kafka® cluster to any relational database offering JDBC drivers like PostgreSQL® or MySQL.
The JDBC sink connector requires topics to have a schema to transfer data to relational databases. You can define or manage this schema for each topic through the Karapace schema registry.
For a complete list of parameters and configuration options, see the connector's documentation.
Prerequisites
-
Aiven for Apache Kafka service with Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
-
Gather the following information about your target database service:
DB_CONNECTION_URL
: The database JDBC connection URL. Examples for PostgreSQL and MySQL:- PostgreSQL:
jdbc:postgresql://HOST:PORT/DB_NAME?sslmode=SSL_MODE
. - MySQL:
jdbc:mysql://HOST:PORT/DB_NAME?ssl-mode=SSL_MODE
.
- PostgreSQL:
DB_USERNAME
: The database username to connect.DB_PASSWORD
: The password for the username selected.TOPIC_LIST
: The list of topics to sink divided by comma.APACHE_KAFKA_HOST
: The hostname of the Apache Kafka service. Required only when using Avro as the data format.SCHEMA_REGISTRY_PORT
: The Apache Kafka's schema registry port. Required only when using Avro as data format.SCHEMA_REGISTRY_USER
: The Apache Kafka's schema registry username. Required only when using Avro as data format.SCHEMA_REGISTRY_PASSWORD
: The Apache Kafka's schema registry user password. Required only when using Avro as data format.
For Aiven for PostgreSQL® and Aiven for MySQL®, access the connection
details (URL, username, and password) on the Overview page of your service in
the Aiven Console, or retrieve them using
the avn service get
command in the Aiven CLI.
Locate the SCHEMA_REGISTRY
parameters in the Schema Registry tab under
Connection information on the service Overview page.
As of Apache Kafka version 3.0, Aiven for Apache Kafka no longer supports Confluent Schema Registry. Consider using Karapace instead.
Setup a JDBC sink connector with Aiven Console
The following example demonstrates setting up a JDBC sink connector for Apache Kafka using the Aiven Console.
Define a Kafka Connect configuration file
Define a Kafka Connect configuration file, such as jdbc_sink.json
, with
the following connector settings:
{
"name":"CONNECTOR_NAME",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"topics": "TOPIC_LIST",
"connection.url": "DB_CONNECTION_URL",
"connection.user": "DB_USERNAME",
"connection.password": "DB_PASSWORD",
"tasks.max":"1",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
The configuration file contains the following entries:
-
name
: The connector name. -
connector.class
: Specifies the class Kafka Connect will use to create the connector. For JDBC sink connectors, useio.aiven.connect.jdbc.JdbcSinkConnector
. -
connection.url
,connection.username
,connection.password
: JDBC parameters for the sink, collected in the prerequisite phase. -
topics
: List the Kafka topics you wish to sink into the database. -
tasks.max
: The maximum number of tasks to execute in parallel. The maximum is 1 per topic and partition. -
auto.create
: Enables automatic creation of the target table in the database if it doesn't exist. -
auto.evolve
: Enables automatic modification of the target table schema to match changes in the Kafka topic messages. -
insert.mode
: Defines how data is inserted into the database:insert
: StandardINSERT
statements.upsert
: Upsert semantics supported by the target database. See the dedicated GitHub repository.update
: Update semantics supported by the target database. See dedicated GitHub repository.
-
pk.mode
: Defines how the connector identifies rows in the target table (primary key):none
: No primary key is used.kafka
: Apache Kafka coordinates are used.record_key
: Entire or part of the message key is used.record_value
: Entire or part of the message value is used.
For more information, see the dedicated GitHub repository.
-
pk.fields
: Defines which fields of the composite key or value to use as record key in the database. -
key.converter
andvalue.converter
: Defines the data format of messages within the Apache Kafka topic. Theio.confluent.connect.avro.AvroConverter
translates messages from the Avro format. The message schemas are retrieved from Aiven's Karapace schema registry, as specified by theschema.registry.url
parameter and related credentials.
The key.converter
and value.converter
settings in the connector configuration
define how the connector parses messages.
When using Avro as source data format, set the following parameters:
value.converter.schema.registry.url
: Points to the Aiven for Apache Kafka schema registry URL. Use the formathttps://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT
whereAPACHE_KAFKA_HOST
andSCHEMA_REGISTRY_PORT
are the values retrieved earlier in the prerequisites.value.converter.basic.auth.credentials.source
: Set toUSER_INFO
to enable username and password access to the schema registry.value.converter.schema.registry.basic.auth.user.info
: Enter the required schema registry credentials in theSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD
format, using theSCHEMA_REGISTRY_USER
andSCHEMA_REGISTRY_PASSWORD
parameters retrieved earlier in the prerequisites.
Create a Kafka Connect connector with the Aiven Console
To create a Kafka Connect connector:
-
Log in to the Aiven Console and select the Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service where the connector needs to be defined.
-
Click Connectors from the sidebar.
-
Click Create connector to start setting up a new connector. This option is visible only if Kafka Connect is enabled for your service.
-
On the Select connector page, locate JDBC Sink and click Get started.
-
In the Common tab, find the Connector configuration text box.
-
Click Edit to modify the connector configuration.
-
Paste the configuration details from your
jdbc_sink.json
file into the text box. -
Click Apply.
noteThe Aiven Console automatically populates the UI fields with the data from the configuration file. You can review and edit these fields across the different tabs. Any modifications you make are updated in the Connector configuration text box in JSON format.
-
Once you've entered all the required settings, click Create connector.
-
Verify the connector status in the Connectors page.
-
Confirm that the data has appeared in the target database service. The table name should match the Apache Kafka topic name.
You can also create connectors using the Aiven CLI command.
Example: Create a JDBC sink connector to PostgreSQL® on a topic with a JSON schema
Suppose you have a topic named iot_measurements
that contains data in
JSON format with a defined JSON schema as follows:
{
"schema": {
"type":"struct",
"fields":[{
"type":"int64",
"optional": false,
"field": "iot_id"
},{
"type":"string",
"optional": false,
"field": "metric"
},{
"type":"int32",
"optional": false,
"field": "measurement"
}]
},
"payload":{ "iot_id":1, "metric":"Temperature", "measurement":14}
}
{
"schema": {
"type":"struct",
"fields":[{
"type":"int64",
"optional": false,
"field": "iot_id"
},{
"type":"string",
"optional": false,
"field": "metric"
},{
"type":"int32",
"optional": false,
"field": "measurement"
}]
},
"payload":{"iot_id":2, "metric":"Humidity", "measurement":60}
}
Embedding a JSON schema in every message can increase the data size. For a more efficient size-to-content ratio, consider using the Avro format with the Karapace schema registry.
To sink the iot_measurements
topic to PostgreSQL, use the following connector
configuration. Replace the placeholders for DB_HOST
, DB_PORT
, DB_NAME
,
DB_SSL_MODE
, DB_USERNAME
, and DB_PASSWORD
:
{
"name":"sink_iot_json_schema",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"topics": "iot_measurements",
"connection.url": "jdbc:postgresql://DB_HOST:DB_PORT/DB_NAME?sslmode=DB_SSL_MODE",
"connection.user": "DB_USERNAME",
"connection.password": "DB_PASSWORD",
"tasks.max":"1",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "iot_id",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
Key aspects of the configuration:
"topics": "iot_measurements"
: Identifiesiot_measurements
topic as the data source for the sink operation."value.converter": "org.apache.kafka.connect.json.JsonConverter"
: Indicates that the message value is in plain JSON format without a schema. Since the key is empty, no converter is defined for it."pk.mode": "record_value"
: Indicates the connector is using the message value to set the target database key."pk.fields": "iot_id"
: Indicates the connector is using the fieldiot_id
on the message value to set the target database key.
Example: Create a JDBC sink connector to MySQL on a topic using Avro and schema registry
Suppose you have a topic named students
that contains data in Avro format. The schema
is stored in the schema registry provided by Karapace
and has the following structure:
key: {"student_id": 1234}
value: {"student_name": "Mary", "exam": "Math", "exam_result":"A"}
To sink the students
topic to MySQL, use the following connector configuration.
Make sure to replace the placeholders for DB_HOST
, DB_PORT
, DB_NAME
, DB_SSL_MODE
,
DB_USERNAME, DB_PASSWORD, APACHE_KAFKA_HOST, SCHEMA_REGISTRY_PORT,
SCHEMA_REGISTRY_USER
, and SCHEMA_REGISTRY_PASSWORD
:
{
"name": "sink_students_avro_schema",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"topics": "students",
"connection.url": "jdbc:mysql://DB_HOST:DB_PORT/DB_NAME?ssl-mode=DB_SSL_MODE",
"connection.user": "DB_USERNAME",
"connection.password": "DB_PASSWORD",
"insert.mode": "upsert",
"table.name.format": "students",
"pk.mode": "record_key",
"pk.fields": "student_id",
"auto.create": "true",
"auto.evolve": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
Key aspects of the configuration:
"topics": "students"
: Identifiesstudents
topic as the data source for the sink operation."pk.mode": "record_key"
: Uses the message key as the database key."pk.fields": "student_id"
: Sets the database key using thestudent_id
field from the message key.key.converter
andvalue.converter
: Defines the Avro data format withio.confluent.connect.avro.AvroConverter
and provides the URL and credentials for the Karapace schema registry.
With "auto.create": "true"
, the connector automatically creates a students
table
in the MySQL database. This table is populated with data from the students
Apache Kafka topic and includes the student_id
, student_name
, exam
, and
exam_result
columns.
Related pages
- View the Database migration with Apache Kafka® and Apache Kafka® Connect blog post