Skip to main content

Use schema registry in Java with Aiven for Apache Kafka®

Aiven for Apache Kafka® provides schema registry functionality through Karapace. Karapace lets you store, retrieve, and evolve schemas without rebuilding producer or consumer code.

Workflow overview

To produce and consume Avro messages in Java using the schema registry:

  1. Define your schema.
  2. Generate Java classes from the schema.
  3. Add the required dependencies.
  4. Optional: Create a keystore, and create a truststore only if you use SASL authentication.
  5. Configure your Kafka producer and consumer properties.

Prerequisites

To connect a Java application to an Aiven for Apache Kafka service using the schema registry:

Variables

Replace the following placeholders in the example configuration:

VariableDescription
BOOTSTRAPSERVERSKafka service URI. Available in Connection information on the service overview page in the Aiven Console.
KEYSTOREPath to the keystore file
KEYSTOREPASSWORDPassword for the keystore
TRUSTSTOREPath to the truststore file
TRUSTSTOREPASSWORDPassword for the truststore
SSLKEYPASSWORDPassword for the private key in the keystore
SCHEMAREGISTRYURLSchema Registry URI. Available in Connection information on the service overview page in the Aiven Console.
SCHEMAREGISTRYUSERSchema Registry username. Available in Connection information on the service overview page in the Aiven Console.
SCHEMAREGISTRYPASSWORDSchema Registry password. Available in Connection information on the service overview page in the Aiven Console.
TOPIC_NAMEKafka topic name

Define the schema

Create an Avro schema file. For example, save the following schema in a file named ClickRecord.avsc

{"type": "record",
"name": "ClickRecord",
"namespace": "io.aiven.avro.example",
"fields": [
{"name": "session_id", "type": "string"},
{"name": "browser", "type": ["string", "null"]},
{"name": "campaign", "type": ["string", "null"]},
{"name": "channel", "type": "string"},
{"name": "referrer", "type": ["string", "null"], "default": "None"},
{"name": "ip", "type": ["string", "null"]}
]
}

This schema defines a record named ClickRecord in the namespace io.aiven.avro.example with fields session_id, browser, campaign, channel, referrer and ip and related data types.

Once the schema is defined, compile it either manually or automatically.

To generate Java classes from Avro, Protobuf, or JSON Schema files, see Generate Java classes from schemas.

Manual schema compilation

To compile manually, download avro-tools-1.12.0.jar from https://avro.apache.org/releases.html or fetch it using Maven:

note

If a newer version of avro-tools is available, use that instead of 1.12.0. To find the most recent release, see https://avro.apache.org/releases.html.

mvn org.apache.maven.plugins:maven-dependency-plugin:2.8:get \
-Dartifact=org.apache.avro:avro-tools:1.12.0:jar \
-Ddest=avro-tools-1.12.0.jar

To compile the schema and generate a Java class, run:

java -jar avro-tools-1.12.0.jar compile schema ClickRecord.avsc .

This generates a Java file named ClickRecord.java in a subdirectory that matches the namespace defined in your schema (for example, io.aiven.avro.example). The required package structure is created automatically if it does not already exist.

Automatic schema compilation

With auto, the schema is compiled during the project build with, for example, maven-avro-plugin or gradle-avro-plugin. The following is a configuration example for maven-avro-plugin when ClickRecord.avsc is stored in the path src/main/avro/ClickRecord.avsc:

To compile the schema during the build process, use the avro-maven-plugin or gradle-avro-plugin. The following is an example Maven configuration:

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.11.0</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/generated-sources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

The generated files will be placed in the generated-sources folder.

Configure producer and consumer properties

For complete example code that shows how to create producers and consumers using the Schema Registry with Aiven for Apache Kafka, see the Aiven examples GitHub repository.

The following sections list the required configuration properties.

Producer configuration

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

Consumer configuration

props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, [BOOTSTRAPSERVERS]);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, [TRUSTSTORE]);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, [TRUSTSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, [KEYSTORE]);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, [KEYSTOREPASSWORD]);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, [SSLKEYPASSWORD]);
props.put("schema.registry.url", [SCHEMAREGISTRYURL]);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", [SCHEMAREGISTRYUSER] + ":" + [SCHEMAREGISTRYPASSWORD]);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "clickrecord-example-group");

Replace the placeholders with the values from the variables section.