Use schema registry in Java with Aiven for Apache Kafka®
Aiven for Apache Kafka® provides the schema registry functionality via Karapace, including the possibility to store, retrieve a schema, and also evolve it without needing to rebuild existing Consumers and Producers code.
To use the schema registry functionality exposed by Karapace, you need to enable the Schema Registry (Karapace) functionality for the Aiven for Apache Kafka instance selected from the Aiven console, service Overview tab.
To use schema registry:
- Create version 1 of a schema
- Use Apache Avro to compile the schema
- Create consumer and producer that utilize Aiven for Apache Kafka and Schema Registry
Prerequisites
To be able to produce and consume data in Java from an Aiven for Apache Kafka service, create a Keystore and Truststore containing the SSL certificates.
You can do it manually by downloading the certificates and composing the files locally on your machine or using the dedicated Aiven CLI command.
Variables
These are the placeholders you will need to replace in the code sample:
Variable | Description |
---|---|
BOOTSTRAPSERVERS | Service URI for Kafka connection, can be obtained from the Aiven console, service Overview tab |
KEYSTORE | Path to the keystore |
KEYSTOREPASSWORD | Password for the keystore |
TRUSTSTORE | Path to the truststore |
TRUSTSTOREPASSWORD | Password for the truststore |
SSLKEYPASSWORD | The password of the private key in the key store file |
SCHEMAREGISTRYURL | Service Registry URI for Apache Kafka connection |
SCHEMAREGISTRYUSER | Service Registry username, can be obtained from the Aiven console, service Overview tab |
SCHEMAREGISTRYPASSWORD | Service Registry password, can be obtained from the Aiven console, service Overview tab |
TOPIC_NAME | Apache Kafka topic name to use |
Create version 1 of the Avro schema
To create an Avro schema, you need a definition file. As example you can
use a click record schema defined in JSON and stored in a file named
ClickRecord.avsc
containing the following:
{"type": "record",
"name": "ClickRecord",
"namespace": "io.aiven.avro.example",
"fields": [
{"name": "session_id", "type": "string"},
{"name": "browser", "type": ["string", "null"]},
{"name": "campaign", "type": ["string", "null"]},
{"name": "channel", "type": "string"},
{"name": "referrer", "type": ["string", "null"], "default": "None"},
{"name": "ip", "type": ["string", "null"]}
]
}
The JSON configuration above defines a schema named ClickRecord
in the
namespace io.aiven.avro.example
with fields session_id
, browser
,
campaign
, channel
, referrer
and ip
and related data types.
Once the schema is defined, compile it either manually or automatically.
Manual schema compilation
In case of manual schema compilation, download avro-tools-1.11.0.jar
from https://avro.apache.org/releases.html or via maven using the
following:
mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.apache.avro:avro-tools:1.11.0:jar -Ddest=avro-tools-1.11.0.jar
The schema defined in the previous step, can be now compiled to produce
a Java class ClickRecord.java
in the io.aiven.avro.example
package
(taken from the namespace
parameter):
java -jar avro-tools-1.11.0.jar compile schema ClickRecord.avsc .
The package structure will also be generated if missing.
Auto schema compilation
With auto, the schema is compiled during the project build with, for
example, maven-avro-plugin
or gradle-avro-plugin
. The following is a
configuration example for maven-avro-plugin
when ClickRecord.avsc
is
stored in the path src/main/avro/ClickRecord.avsc
:
[](plugin)
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro-maven-plugin.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
[](/plugin)
The automatically generated Avro-schema code will be stored under the
folder ${project.basedir}/src/main/generated-sources/
.
Set consumer and producer properties for schema registry
The full code to create consumer and producers using the Schema Registry in Aiven for Apache Kafka can be found in the Aiven examples GitHub repository. The following contains a list of the properties required.
For producers, specify:
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
For consumers, specify:
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "clickrecord-example-group");
In the above properties replace all the required input parameters (within square brackets) with the appropriate information defined in the Variables section.