Create a Stream Reactor sink connector from Apache Kafka® to Redis®*
The Redis®* Stream Reactor sink connector enables you to move data from an Aiven for Apache Kafka® cluster to a Redis®* database. It uses KCQL transformations to filter and map topic data before sending it to Redis.
In version 4.2.0 of the Redis Stream Reactor sink connector, a known issue with the
GEOADD
command may cause exceptions during initialization under specific configurations.
For more information, see the GitHub issue.
Version compatibility
Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.
Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.
If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:
"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"
For details about the changes, see the Stream Reactor release notes.
Prerequisites
-
An Aiven for Apache Kafka service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
-
Gather the following information for the target Redis database:
-
REDIS_HOSTNAME
: The Redis hostname. -
REDIS_PORT
: The Redis port. -
REDIS_PASSWORD
: The Redis password. -
REDIS_SSL
: Set totrue
orfalse
, depending on your SSL setup. -
TOPIC_LIST
: A comma-separated list of Kafka topics to sink. -
KCQL_TRANSFORMATION
: A KCQL statement to map topic fields to Redis cache entries. Use the following format:INSERT INTO REDIS_CACHE
SELECT LIST_OF_FIELDS
FROM APACHE_KAFKA_TOPIC -
APACHE_KAFKA_HOST
: The Apache Kafka host. Required only when using Avro as the data format. -
SCHEMA_REGISTRY_PORT
: The schema registry port. Required only when using Avro. -
SCHEMA_REGISTRY_USER
: The schema registry username. Required only when using Avro. -
SCHEMA_REGISTRY_PASSWORD
: The schema registry password. Required only when using Avro.
-
If you are using Aiven for Caching and Aiven for Apache Kafka, get all required connection details, including schema registry information, from the Connection information section on the Overview page.
As of version 3.0, Aiven for Apache Kafka uses Karapace as the schema registry and no longer supports the Confluent Schema Registry.
Create a connector configuration file
Create a file named redis_sink.json
and add the following configuration:
{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"topics": "TOPIC_LIST",
"connect.redis.host": "REDIS_HOSTNAME",
"connect.redis.port": "REDIS_PORT",
"connect.redis.password": "REDIS_PASSWORD",
"connect.redis.ssl.enabled": "REDIS_SSL",
"connect.redis.kcql": "KCQL_TRANSFORMATION",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
Parameters:
name
: The connector name. ReplaceCONNECTOR_NAME
with your desired name.connect.redis.*
: Redis connection parameters collected in the prerequisite step.key.converter
andvalue.converter
: Define the message data format in the Kafka topic. This example usesio.confluent.connect.avro.AvroConverter
to translate messages in Avro format. The schema is retrieved from Aiven's Karapace schema registry using theschema.registry.url
and related credentials.
The key.converter
and value.converter
fields define how Kafka messages are parsed
and must be included in the configuration.
When using Avro as the source format, set the following:
value.converter.schema.registry.url
: Use the Aiven for Apache Kafka schema registry URL in the formathttps://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT
.value.converter.basic.auth.credentials.source
: Set toUSER_INFO
, which means authentication is done using a username and password.value.converter.schema.registry.basic.auth.user.info
: Provide the schema registry credentials in the formatSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD
.
You can retrieve these values from the prerequisite step.
Create the connector
- Console
- Aiven CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
-
Click Connectors.
-
Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
- Click Service settings in the sidebar.
- In the Service management section, click Actions > Enable Kafka connect.
-
In the sink connectors list, select Redis Sink Connector, and click Get started.
-
On the Stream Reactor Redis Sink page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
redis_sink.json
file into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
-
Confirm that data is written to the Redis target database.
To create the connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @redis_sink.json
Replace:
SERVICE_NAME
: Your Kafka or Kafka Connect service name.@redis_sink.json
: Path to your configuration file.
Sink topic data to Redis
The following example shows how to sink data from a Kafka topic to a Redis database.
If your Kafka topic students
contains the following data:
{"id":1, "name":"carlo", "age": 77}
{"id":2, "name":"lucy", "age": 55}
{"id":3, "name":"carlo", "age": 33}
{"id":2, "name":"lucy", "age": 21}
To write this data to Redis, use the following connector configuration:
{
"name": "my-redis-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"connect.redis.host": "REDIS_HOSTNAME",
"connect.redis.port": "REDIS_PORT",
"connect.redis.password": "REDIS_PASSWORD",
"connect.redis.ssl.enabled": "REDIS_SSL",
"topics": "students",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"connect.redis.kcql": "INSERT INTO students- SELECT * FROM students PK id"
}
Replace all placeholder values (such as REDIS_HOSTNAME
, REDIS_PORT
, and REDIS_PASSWORD
)
with your actual Redis connection details.
This configuration does the following:
"topics": "students"
: Specifies the Kafka topic to sink.- Connection settings (
connect.redis.*
): Provide the Redis host, port, password, and SSL setting. "value.converter"
and"value.converter.schemas.enable"
: Set the message format. The topic uses raw JSON without a schema."connect.redis.kcql"
: Defines the insert logic. Each Kafka message is written as a key-value pair in Redis. The key is built from theid
field and prefixed withstudents-
.
After creating the connector, check your Redis database. You should see the following entries:
1. "students-1" containing "{\"name\":\"carlo\",\"id\":1,\"age\":77}"
2. "students-2" containing "{\"name\":\"lucy\",\"id\":2,\"age\":21}"
3. "students-3" containing "{\"name\":\"carlo\",\"id\":3,\"age\":33}"
Only three keys are present because two Kafka messages shared the same "id": 2
.
Redis overwrites entries that use the same key.