Bring your own Apache Kafka® Connect cluster
Aiven provides Apache Kafka® Connect as a managed service in combination with the Aiven for Apache Kafka® managed service. However, there are circumstances where you may want to roll your own Kafka Connect cluster.
Integrate your own Apache Kafka Connect cluster with Aiven for Apache Kafka and use the schema registry offered by Karapace. The example below shows how to create a JDBC sink connector to a PostgreSQL® database.
Prerequisites
To bring your own Apache Kafka Connector, you need an Aiven for Apache Kafka service up and running.
For the JDBC sink connector database example, collect the following information about the Aiven for Apache Kafka service and the target database upfront:
- APACHE_KAFKA_HOST: The hostname of the Apache Kafka service
- APACHE_KAFKA_PORT: The port of the Apache Kafka service
- REST_API_PORT: The Apache Kafka's REST API port, only needed when testing data flow with REST APIs
- REST_API_USERNAME: The Apache Kafka's REST API username, only needed when testing data flow with REST APIs
- REST_API_PASSWORD: The Apache Kafka's REST API password, only needed when testing data flow with REST APIs
- SCHEMA_REGISTRY_PORT: The Apache Kafka's schema registry port, only needed when using Avro as data format
- SCHEMA_REGISTRY_USER: The Apache Kafka's schema registry username, only needed when using Avro as data format
- SCHEMA_REGISTRY_PASSWORD: The Apache Kafka's schema registry user password, only needed when using Avro as data format
- PG_HOST: The PostgreSQL service hostname
- PG_PORT: The PostgreSQL service port
- PG_USERNAME: The PostgreSQL service username
- PG_PASSWORD: The PostgreSQL service password
- PG_DATABASE_NAME: The PostgreSQL service database name
If you're using Aiven for PostgreSQL and Aiven for Apache Kafka the
above details are available in the Aiven
console service Overview tab or via the
dedicated avn service get command with the
Aiven CLI.
Attach your own Apache Kafka Connect cluster to Aiven for Apache Kafka®
The following example demonstrates how to setup a local Apache Kafka Connect cluster with a working JDBC sink connector and attach it to an Aiven for Apache Kafka service.
Set up the truststore and keystore
Create a Java keystore and truststore for the Aiven for Apache Kafka service. For the following example we assume:
- The keystore is available at KEYSTORE_PATH/client.keystore.p12
- The truststore is available at
TRUSTSTORE_PATH/client.truststore.jks
- For simplicity, the same secret (password) is used for both the
keystore and the truststore, and is shown as KEY_TRUST_SECRET
Configure the Aiven for Apache Kafka service
Enable the schema registry features offered by Karapace. You can do it in the Aiven Console in the Aiven for Apache Kafka service Overview tab.
- Enable the Schema Registry (Karapace) and Apache Kafka REST API (Karapace)
- In the Topic tab, create a topic called jdbc_sink, the topic will be used by the Apache Kafka Connect connector
Download the required binaries
The following binaries are needed to setup a Apache Kafka Connect cluster locally:
- Apache Kafka
- Aiven for Kafka connect JDBC connector
- If you are going to use Avro as the data format, Avro Value Converter. The examples below show how to do this.
Set up the local Apache Kafka Connect cluster
The following process defines the setup required to create a local Apache Kafka
Connect cluster with Apache Kafka 3.1.0, Avro converter 7.1.0 and JDBC
connector 6.7.0:
- 
Extract the Apache Kafka binaries tar -xzf kafka_2.13-3.1.0.tgz
- 
Within the newly created kafka_2.13-3.1.0folder, create apluginsfolder containing alibsub-foldercd kafka_2.13-3.1.0
 mkdir -p plugins/lib
- 
Unzip the JDBC and Avro binaries and copy the jarfiles in theplugins/libfolder# extract aiven connect jdbc
 unzip jdbc-connector-for-apache-kafka-6.7.0.zip
 # extract confluent kafka connect avro converter
 unzip confluentinc-kafka-connect-avro-converter-7.1.0.zip
 # copying plugins in the plugins/lib folder
 cp jdbc-connector-for-apache-kafka-6.7.0/*.jar plugins/lib/
 cp confluentinc-kafka-connect-avro-converter-7.1.0/*.jar plugins/lib/
- 
Create a properties file, my-connect-distributed.properties, under the mainkafka_2.13-3.1.0folder, for the Apache Kafka Connect settings. Change the following placeholders:- PATH_TO_KAFKA_HOMEto the path to the- kafka_2.13-3.1.0folder
- APACHE_KAFKA_HOST,- APACHE_KAFKA_PORT,- SCHEMA_REGISTRY_PORT,- SCHEMA_REGISTRY_USER,- SCHEMA_REGISTRY_PASSWORD, to the related parameters fetched in the prerequisite step
- KEYSTORE_PATH,- TRUSTSTORE_PATHand- KEY_TRUST_SECRETto the keystore, truststore location and related secret as defined in the related step
 # Define the folders for plugins, including the JDBC and Avro
 plugin.path=PATH_TO_KAFKA_HOME/kafka_2.13-3.1.0/plugins
 # Defines the location of the Apache Kafka bootstrap servers
 bootstrap.servers=APACHE_KAFKA_HOST:APACHE_KAFKA_PORT
 # Defines the group.id used by the connection cluster
 group.id=connect-cluster
 # Defines the input data format for key and value: JSON without schema
 key.converter=org.apache.kafka.connect.json.JsonConverter
 value.converter=org.apache.kafka.connect.json.JsonConverter
 key.converter.schemas.enable=false
 value.converter.schemas.enable=false
 # Defines the internal data format for key and value: JSON without schema
 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 # Connect clusters create three topics to manage offsets, configs, and status
 # information. Note that these contribute towards the total partition limit quota.
 offset.storage.topic=connect-offsets
 offset.storage.replication.factor=3
 offset.storage.partitions=3
 config.storage.topic=connect-configs
 config.storage.replication.factor=3
 status.storage.topic=connect-status
 status.storage.replication.factor=3
 # Defines the flush interval for the offset comunication
 offset.flush.interval.ms=10000
 # Defines the SSL endpoint
 ssl.endpoint.identification.algorithm=https
 request.timeout.ms=20000
 retry.backoff.ms=500
 security.protocol=SSL
 ssl.protocol=TLS
 ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks
 ssl.truststore.password=KEY_TRUST_SECRET
 ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12
 ssl.keystore.password=KEY_TRUST_SECRET
 ssl.key.password=KEY_TRUST_SECRET
 ssl.keystore.type=PKCS12
 # Defines the consumer SSL endpoint
 consumer.ssl.endpoint.identification.algorithm=https
 consumer.request.timeout.ms=20000
 consumer.retry.backoff.ms=500
 consumer.security.protocol=SSL
 consumer.ssl.protocol=TLS
 consumer.ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks
 consumer.ssl.truststore.password=KEY_TRUST_SECRET
 consumer.ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12
 consumer.ssl.keystore.password=KEY_TRUST_SECRET
 consumer.ssl.key.password=KEY_TRUST_SECRET
 consumer.ssl.keystore.type=PKCS12
 # Defines the producer SSL endpoint
 producer.ssl.endpoint.identification.algorithm=https
 producer.request.timeout.ms=20000
 producer.retry.backoff.ms=500
 producer.security.protocol=SSL
 producer.ssl.protocol=TLS
 producer.ssl.truststore.location=TRUSTSTORE_PATH/client.truststore.jks
 producer.ssl.truststore.password=KEY_TRUST_SECRET
 producer.ssl.keystore.location=KEYSTORE_PATH/client.keystore.p12
 producer.ssl.keystore.password=KEY_TRUST_SECRET
 producer.ssl.key.password=KEY_TRUST_SECRET
 producer.ssl.keystore.type=PKCS12
- 
Start the local Apache Kafka Connect cluster, executing the following from the kafka_2.13-3.1.0folder:./bin/connect-distributed.sh ./my-connect-distributed.properties
Add the JDBC sink connector
To add a JDBC connector to the local Apache Kafka Connect cluster:
- 
Create the JDBC sink connector JSON configuration file named jdbc-sink-pg.jsonwith the following content, replacing the placeholdersPG_HOST,PG_PORT,PG_USERNAME,PG_PASSWORD,PG_DATABASE_NAME,APACHE_KAFKA_HOST,SCHEMA_REGISTRY_PORT,SCHEMA_REGISTRY_USER,SCHEMA_REGISTRY_PASSWORD.{
 "name": "jdbc-sink-pg",
 "config": {
 "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
 "connection.url": "jdbc:postgresql://PG_HOST:PG_PORT/PG_DATABASE_NAME?user=PG_USERNAME&password=PG_PASSWORD&ssl=required",
 "tasks.max": "1",
 "topics": "jdbc_sink",
 "auto.create": "true",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
 "value.converter.basic.auth.credentials.source": "USER_INFO",
 "value.converter.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
 }
 }
- 
Create the JDBC sink connector instance using Kafka Connect REST APIs curl -s -H "Content-Type: application/json" -X POST \
 -d @jdbc-sink-pg.json \
 http://localhost:8083/connectors/
- 
Check the status of the JDBC sink connector instance, jqis used to beautify the outputcurl localhost:8083/connectors/jdbc-sink-pg/status | jqThe result should be similar to the following {
 "name": "jdbc-sink-pg",
 "connector": {
 "state": "RUNNING",
 "worker_id": "10.128.0.12:8083"
 },
 "tasks": [
 {
 "id": 0,
 "state": "RUNNING",
 "worker_id": "10.128.0.12:8083"
 }
 ],
 "type": "sink"
 }
Check the dedicated blog post for an end-to-end example of how to setup a Kafka Connect cluster to host a custom connector.
Verify the JDBC connector using Karapace REST APIs
To verify that the connector is working, you can write messages to the
jdbc_sink topic in Avro format using Karapace REST
APIs:
- 
Create a Avro schema using the /subjects/endpoint, after changing the placeholders forREST_API_USER,REST_API_PASSWORD,APACHE_KAFKA_HOST,REST_API_PORTcurl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
 --data '''
 {"schema":
 "{\"type\": \"record\",\"name\": \"jdbcsinkexample\",\"namespace\": \"example\",\"doc\": \"example\",\"fields\": [{ \"type\": \"string\", \"name\": \"name\", \"doc\": \"person name\", \"namespace\": \"example\", \"default\": \"mario\"},{ \"type\": \"int\", \"name\": \"age\", \"doc\": \"persons age\", \"namespace\": \"example\", \"default\": 5}]}"
 }''' \
 https://REST_API_USER:REST_API_PASSWORD@APACHE_KAFKA_HOST:REST_API_PORT/subjects/jdbcsinkexample/versions/The above call creates a new schema called jdbcsinkexamplewith a schema containing two fields (nameandage).
- 
Create a message in the jdbc_sinktopic using thejdbcsinkexampleschema, after changing the placeholders forREST_API_USER,REST_API_PASSWORD,APACHE_KAFKA_HOST,REST_API_PORTcurl -H "Content-Type: application/vnd.kafka.avro.v2+json" -X POST \
 -d '''
 {"value_schema":
 "{\"namespace\": \"test\", \"type\": \"record\", \"name\": \"example\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"},{\"name\": \"age\", \"type\": \"int\"}]}",
 "records": [{"value": {"name": "Eric","age":77}}]}''' \
 https://REST_API_USER:REST_API_PASSWORD@APACHE_KAFKA_HOST:REST_API_PORT/topics/jdbc_sink
- 
Verify the presence of a table called jdbc_sinkin PostgreSQL containing the row with nameEricand age77