Use Apache Kafka® Streams with Aiven for Apache Kafka®
Apache Kafka® Streams is a client-side library for building real-time applications where input and output data are stored in Kafka clusters.
Kafka Streams enables you to build scalable, fault-tolerant applications that process data streams. It reads from one or more input sources (such as Kafka topics) and writes to a sink (such as an output Kafka topic). You write Kafka Streams applications in Java or Scala.
The following example shows how to use Kafka Streams with Aiven for Apache Kafka® and Karapace schema registry to filter Apache Avro™ messages.
The example uses data from the Sample Data Generator for Logistics,
which writes to the logistics_data_gen topic. The example code reads from
that topic and writes filtered data to the logistics_data_delivered topic:
- Writes messages where the
stateisDelivered. - Copies the
carrierandmanifestfields, and renamestime_utctotimeUtcandtracking_idtotrackingId. Other fields are not copied.
The Avro messages in this example use the
Confluent wire format. In this format, a schema ID is
inserted before each message value. This format is sometimes referred to as
AvroConfluent.
The input message schema is retrieved from the schema registry. The output schema is defined in logistics_delivered.avsc, compiled into the Java application, and registered with the schema registry.
Prerequisites
You can run this example with any Apache Kafka service. The steps below use an Aiven for Apache Kafka® service.
Schema registry
This example requires a schema registry. The producer registers the Avro schema to obtain a schema ID, which is added to each message. The consumer retrieves the schema from the registry to decode the message.
Enable the Karapace schema registry for the Kafka service. See Enable Karapace schema registry.
Environment variables
Create the following environment variables to connect to the Aiven for Apache Kafka and Karapace services:
KAFKA_SERVICE_URL: Service URL of the Kafka serviceSCHEMA_REGISTRY_URL: Service URI of the schema registrySCHEMA_REGISTRY_USERNAME: Username for the schema registrySCHEMA_REGISTRY_PASSWORD: Password for the schema registry
You can find these values in Connection information on the
Overview page in the Aiven console or
by running avn service get with the
Aiven CLI. You can also download the
certificate files used in the next step from this section.
Certificates
Create a directory named certs and download the following files to this directory:
- Access key (
service.key) - Access certificate (
service.cert) - CA certificate (
ca.pem)
Kafka topic
Create the output topic
logistics_data_delivered
in the Kafka service. The sample data generator automatically creates the input topic.
Local tools
Install one of the following to run the example:
- Docker to run the application in a container
- Gradle to build and run the application locally using the
run.shscript
Get the example application code
-
Clone the
kafka-streams-examplerepository from GitHub:git clone https://github.com/Aiven-Labs/kafka-streams-example.git -
Change into the repository directory:
cd kafka-streams-example
Start the Logistics data stream
Follow the instructions in Stream sample data from the Aiven Console to start the Logistics data generator.
Run the example application
Set the following environment variables:
CA_PEM_CONTENTS: Contents of theca.pemfileSERVICE_CERT_CONTENTS: Contents of theservice.certfileSERVICE_KEY_CONTENTS: Contents of theservice.keyfile
Set these variables by sourcing the prep_cert_env.sh script in the cloned
repository:
source prep_cert_env.sh
- Run with Docker
- Build and run locally
-
Build the container image for the
GenericFilterAppexample:docker build --build-arg APP_NAME=GenericFilterApp -t appimage . -
Run the container using the environment variables set earlier:
docker run -d --name kafka-streams-container -p 3000:3000 \
-e KAFKA_SERVICE_URL=$KAFKA_SERVICE_URL \
-e CA_PEM_CONTENTS="$CA_PEM_CONTENTS" \
-e SERVICE_CERT_CONTENTS="$SERVICE_CERT_CONTENTS" \
-e SERVICE_KEY_CONTENTS="$SERVICE_KEY_CONTENTS" \
-e SCHEMA_REGISTRY_URL=$SCHEMA_REGISTRY_URL \
-e SCHEMA_REGISTRY_USERNAME=$SCHEMA_REGISTRY_USERNAME \
-e SCHEMA_REGISTRY_PASSWORD=$SCHEMA_REGISTRY_PASSWORD \
appimage
Build and run the application locally:
-
Build the application. This command creates a fat JAR.
gradle GenericFilterAppUberJar -
Copy the JAR file to the current directory so the
run.shscript can find it:cp app/build/libs/GenericFilterApp-uber.jar . -
Run the application using the
run.shscript:APP_NAME=GenericFilterApp ./run.shThe script uses the environment variables set earlier.
The Docker image uses the same run.sh script.
Check the produced data
- In the Aiven console
- With Python
- In the Aiven console, open the Aiven for Apache Kafka® service.
- In the sidebar, click Topics.
- Select the
logistics_data_deliveredtopic. - Click Messages.
- In Format, select
Avro. - Click Fetch messages.
The reporting directory contains the command-line program report_messages.py, which
reads messages from the input and output topics and displays them in the terminal.
Install uv to run the script.
After installing uv and
setting the environment variables from earlier steps, run the script:
reporting/report_messages.py
About the example code
The example repository contains source code for several applications. This example
focuses on GenericFilterApp.java.
See the example repository README for additional details and other sample programs.