Use Apache Kafka® Streams with Aiven for Apache Kafka®
Apache Kafka® streams and streams API allows streaming data through the heart of Apache Kafka: the brokers.
As the data, a key-value structure, enters the brokers, it is serialized by the streams API to a byte array. The opposite happens when data exits the brokers, where the streams API de-serializes it back to the original key-value structure.
Apache Kafka streams also allows data transformation in real-time with the output feeding another, transformed, stream of data. This makes Kafka streams a powerful tool for the variety of use cases it can address in the world of real-time data processing and analysis.
The following article explores how to realize Kafka streams with Aiven for Apache Kafka and the schema registry functionality offered by Karapace.
Prerequisites
To use Kafka streams, you need an Aiven for Apache Kafka service up and running with Schema Registry (Karapace) Enabled.
Enabling Schema Registry (Karapace) is needed to make use of the schema registry features necessary for dealing with messages in Avro format. If you plan to use JSON format, enabling Schema Registry (Karapace) might not be necessary.
Two topics named song-feed
and play-events
should be created.
For the example, collect the following
information about the Aiven for Apache Kafka service:
APACHE_KAFKA_HOST
: The hostname of the Apache Kafka serviceAPACHE_KAFKA_PORT
: The port of the Apache Kafka serviceSCHEMA_REGISTRY_PORT
: The Apache Kafka's schema registry port, only needed when using Avro as data formatSCHEMA_REGISTRY_USER
: The Apache Kafka's schema registry username, only needed when using Avro as data formatSCHEMA_REGISTRY_PASSWORD
: The Apache Kafka's schema registry user password, only needed when using Avro as data format
If you're using Aiven for Apache Kafka the above details are available
in the Aiven console service Overview tab
or via the dedicated avn service get
command with the
Aiven CLI.
The following example assumes you have Apache Maven already installed.
Use Kafka streams with Aiven for Apache Kafka - KafkaMusicExample
The following example shows how to customise the KafkaMusicExample
available in the dedicated
repository to
work with Aiven for Apache Kafka.
-
Download the
kafka-streams-examples
sources from GitHubgit clone https://github.com/confluentinc/kafka-streams-examples.git
-
Build the packages using Maven
cd kafka-streams-examples/
mvn -DskipTests=true clean package
Setup the truststore and keystore
Create a Java keystore and truststore for the Aiven for Apache Kafka service. For the following example we assume:
- The keystore is available at
KEYSTORE_PATH/client.keystore.p12
- The truststore is available at
TRUSTSTORE_PATH/client.truststore.jks
- For simplicity, the same secret (password) is used for both the
keystore and the truststore, and is shown as
KEY_TRUST_SECRET
Customizing the Java applications
The KafkaMusicExample
example in the repository is constituted by two
classes under the
src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic
folder:
KafkaMusicExampleDriver.java
: an Apache Kafka producer writing messages to a topic namedsong-feed
KafkaMusicExample.java
: a Kafka stream application reading from thesong-feed
topic and calculating aggregated metrics
To have the two applications working with Aiven for Apache Kafka we need to customise the files to use the right endpoints.
Customize KafkaMusicExampleDriver.java
-
Add the following dependencies
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.HashMap; -
After the
KafkaMusicExampleDriver
class declaration add the following two lines to set theDEFAULT_BOOTSTRAP_SERVERS
andDEFAULT_SCHEMA_REGISTRY_URL
endpoints replacing theAPACHE_KAFKA_HOST
,APACHE_KAFKA_PORT
,APACHE_KAFKA_HOST
,SCHEMA_REGISTRY_PORT
placeholdersprivate static final String DEFAULT_BOOTSTRAP_SERVERS = "APACHE_KAFKA_HOST:APACHE_KAFKA_PORT";
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT"; -
Within the
main
function, replace thebootstrapServers
andschemaRegistryUrl
default valuesfinal String bootstrapServers = args.length > 1 ? args[1] : DEFAULT_BOOTSTRAP_SERVERS;
final String schemaRegistryUrl = args.length > 2 ? args[2] : DEFAULT_SCHEMA_REGISTRY_URL; -
Within the
main
function, after the lineprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
define the keystore and truststore location and secrets for SSL connection, by replacing the placeholders
KEYSTORE_PATH
,TRUSTSTORE_PATH
andKEY_TRUST_SECRET
with the values set when creating the keystore and truststore.props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "TRUSTSTORE_PATH/client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "KEYSTORE_PATH/client.keystore.p12");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "KEY_TRUST_SECRET"); -
Within the
main
function, replace the linefinal Map<String, String> serdeConfig = Collections.singletonMap(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);with the following, creating and configuring the
SpecificAvroSerdes
required, passing the schema registry username and password and substituting theSCHEMA_REGISTRY_USER
andSCHEMA_REGISTRY_PASSWORD
placeholdersfinal Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD");
Customize KafkaMusicExample.java
Similar changes need to be performed in the file
KafkaMusicExample.java
, replacing the placeholders with the connection
parameters fetched in the
prerequisite phase.
-
Add the following dependencies
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs; -
Change the
DEFAULT_BOOTSTRAP_SERVERS
andDEFAULT_SCHEMA_REGISTRY_URL
endpoints replacing theAPACHE_KAFKA_HOST
,APACHE_KAFKA_PORT
,APACHE_KAFKA_HOST
,SCHEMA_REGISTRY_PORT
placeholdersprivate static final String DEFAULT_BOOTSTRAP_SERVERS = "APACHE_KAFKA_HOST:APACHE_KAFKA_PORT";
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT"; -
Replace the
bootstrapServers
andschemaRegistryUrl
default valuesfinal String bootstrapServers = args.length > 1 ? args[1] : DEFAULT_BOOTSTRAP_SERVERS;
final String schemaRegistryUrl = args.length > 2 ? args[2] : DEFAULT_SCHEMA_REGISTRY_URL; -
Within the
main
function, replace the linefinal KafkaStreams streams = new KafkaStreams(
buildTopology(singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)),
streamsConfig(bootstrapServers, restEndpointPort, "/tmp/kafka-streams", restEndpointHostname)
);with the following, creating and configuring the
SpecificAvroSerdes
required, passing the schema registry username and password and substituting theSCHEMA_REGISTRY_USER
andSCHEMA_REGISTRY_PASSWORD
placeholdersfinal Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD");
final KafkaStreams streams = new KafkaStreams(
buildTopology(serdeConfig),
streamsConfig(bootstrapServers, restEndpointPort, "/tmp/kafka-streams", restEndpointHostname)
); -
Within the
streamsConfig
static function, after the linestreamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
define the keystore and truststore location and secrets for SSL connection, by replacing the placeholders
KEYSTORE_PATH
,TRUSTSTORE_PATH
andKEY_TRUST_SECRET
with the values set when creating the keystore and truststore.streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "TRUSTSTORE_PATH/client.truststore.jks");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PKCS12");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "KEYSTORE_PATH/client.keystore.p12");
streamsConfiguration.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
streamsConfiguration.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "KEY_TRUST_SECRET");
Build the applications
From the main kafka-streams-examples
folder, execute the following
Maven command to build the applications:
mvn -DskipTests=true clean package
The above command should create a jar
file named
kafka-streams-examples-<VERSION>-standalone.jar
under the target
folder, where <VERSION>
depends on the repository release number. When
using the 7.00
release the file name would be
kafka-streams-examples-7.0.0-standalone.jar
.
Run the applications
To run the applications in parallel, have two terminal
sessions pointing at the main kafka-streams-examples
folder.
From the first terminal session you can start the
KafkaMusicExampleDriver
producer with:
java -cp ./target/kafka-streams-examples-7.0.0-standalone.jar \
io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver
Check the target folder of your project and change the version of the command if necessary.
From the second terminal session you can start the KafkaMusicExample
Kafka streams application with:
java -cp ./target/kafka-streams-examples-7.0.0-standalone.jar \
io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample 7070
Change the port number 7070 to the actual Kafka Rest port of your Aiven for Apache Kafka service.