Skip to main content

Use schema registry in Java with Aiven for Apache Kafka®

Aiven for Apache Kafka® provides the schema registry functionality via Karapace, including the possibility to store, retrieve a schema, and also evolve it without needing to rebuild existing Consumers and Producers code.

note

To use the schema registry functionality exposed by Karapace, you need to enable the Schema Registry (Karapace) functionality for the Aiven for Apache Kafka instance selected from the Aiven console, service Overview tab.

To use schema registry:

  1. Create version 1 of a schema
  2. Use Apache Avro to compile the schema
  3. Create consumer and producer that utilize Aiven for Apache Kafka and Schema Registry

Prerequisites

To be able to produce and consume data in Java from an Aiven for Apache Kafka service, create a Keystore and Truststore containing the SSL certificates.

You can do it manually by downloading the certificates and composing the files locally on your machine or using the dedicated Aiven CLI command.

Variables

These are the placeholders you will need to replace in the code sample:

VariableDescription
BOOTSTRAPSERVERSService URI for Kafka connection, can be obtained from the Aiven console, service Overview tab
KEYSTOREPath to the keystore
KEYSTOREPASSWORDPassword for the keystore
TRUSTSTOREPath to the truststore
TRUSTSTOREPASSWORDPassword for the truststore
SSLKEYPASSWORDThe password of the private key in the key store file
SCHEMAREGISTRYURLService Registry URI for Apache Kafka connection
SCHEMAREGISTRYUSERService Registry username, can be obtained from the Aiven console, service Overview tab
SCHEMAREGISTRYPASSWORDService Registry password, can be obtained from the Aiven console, service Overview tab
TOPIC_NAMEApache Kafka topic name to use

Create version 1 of the Avro schema

To create an Avro schema, you need a definition file. As example you can use a click record schema defined in JSON and stored in a file named ClickRecord.avsc containing the following:

{"type": "record",
"name": "ClickRecord",
"namespace": "io.aiven.avro.example",
"fields": [
{"name": "session_id", "type": "string"},
{"name": "browser", "type": ["string", "null"]},
{"name": "campaign", "type": ["string", "null"]},
{"name": "channel", "type": "string"},
{"name": "referrer", "type": ["string", "null"], "default": "None"},
{"name": "ip", "type": ["string", "null"]}
]
}

The JSON configuration above defines a schema named ClickRecord in the namespace io.aiven.avro.example with fields session_id, browser, campaign, channel, referrer and ip and related data types.

Once the schema is defined, compile it either manually or automatically.

Manual schema compilation

In case of manual schema compilation, download avro-tools-1.11.0.jar from https://avro.apache.org/releases.html or via maven using the following:

mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get -Dartifact=org.apache.avro:avro-tools:1.11.0:jar -Ddest=avro-tools-1.11.0.jar

The schema defined in the previous step, can be now compiled to produce a Java class ClickRecord.java in the io.aiven.avro.example package (taken from the namespace parameter):

java -jar avro-tools-1.11.0.jar compile schema ClickRecord.avsc .
note

The package structure will also be generated if missing.

Auto schema compilation

With auto, the schema is compiled during the project build with, for example, maven-avro-plugin or gradle-avro-plugin. The following is a configuration example for maven-avro-plugin when ClickRecord.avsc is stored in the path src/main/avro/ClickRecord.avsc:

[](plugin)
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro-maven-plugin.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
[](/plugin)

The automatically generated Avro-schema code will be stored under the folder ${project.basedir}/src/main/generated-sources/.

Set consumer and producer properties for schema registry

The full code to create consumer and producers using the Schema Registry in Aiven for Apache Kafka can be found in the Aiven examples GitHub repository. The following contains a list of the properties required.

For producers, specify:

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

For consumers, specify:

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "clickrecord-example-group");
note

In the above properties replace all the required input parameters (within square brackets) with the appropriate information defined in the Variables section.