Create a sink connector by Lenses.io from Apache Kafka® to MongoDB
The MongoDB Stream Reactor sink connector enables you to move data from an Aiven for Apache Kafka cluster to a MongoDB database. It uses KCQL transformations to filter and map topic data before sending it to MongoDB.
Aiven supports two different MongoDB sink connectors. Each has a separate implementation and configuration options:
This document covers the connector by Lenses.io. For the MongoDB connector by MongoDB, see the related document.
Version compatibility
Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.
Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.
If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:
"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"
For details about the changes, see the Stream Reactor release notes.
Prerequisites
-
An Aiven for Apache Kafka service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
-
Gather the following information for the target MongoDB database:
-
MONGODB_USERNAME
: The MongoDB username. -
MONGODB_PASSWORD
: The MongoDB password. -
MONGODB_HOST
: The MongoDB hostname. -
MONGODB_PORT
: The MongoDB port. -
MONGODB_DATABASE_NAME
: The target MongoDB database name. -
TOPIC_LIST
: A comma-separated list of Kafka topics to sink. -
KCQL_TRANSFORMATION
: A KCQL statement to map topic data to MongoDB collections. Use the following format:INSERT | UPSERT INTO MONGODB_COLLECTION
SELECT LIST_OF_FIELDS
FROM APACHE_KAFKA_TOPIC -
APACHE_KAFKA_HOST
: The Apache Kafka host. Required only when using Avro. -
SCHEMA_REGISTRY_PORT
: The schema registry port. Required only when using Avro. -
SCHEMA_REGISTRY_USER
: The schema registry username. Required only when using Avro. -
SCHEMA_REGISTRY_PASSWORD
: The schema registry password. Required only when using Avro.noteIf you are using Aiven for MongoDB and Aiven for Apache Kafka, get all required connection details, including schema registry information, from the Connection information section on the Overview page.
As of version 3.0, Aiven for Apache Kafka uses Karapace as the schema registry and no longer supports the Confluent Schema Registry.
-
For a complete list of supported parameters and configuration options, see the connector's documentation.
Create a connector configuration file
Create a file named mongodb_sink.json
and add the following configuration:
{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"topics": "TOPIC_LIST",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DATABASE_NAME",
"connect.mongo.kcql": "KCQL_TRANSFORMATION",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
Parameters:
name
: The connector name. ReplaceCONNECTOR_NAME
with your desired name.connect.mongo.*
: MongoDB connection parameters collected in the prerequisite step.key.converter
andvalue.converter
: Define the message data format in the Kafka topic. This example usesio.confluent.connect.avro.AvroConverter
to translate messages in Avro format. The schema is retrieved from Aiven's Karapace schema registry using theschema.registry.url
and related credentials.
The key.converter
and value.converter
fields define how Kafka messages are parsed
and must be included in the configuration.
When using Avro as the source format, set the following:
value.converter.schema.registry.url
: Use the Aiven for Apache Kafka schema registry URL in the formathttps://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT
. Retrieve these values from the prerequisite step.value.converter.basic.auth.credentials.source
: Set toUSER_INFO
, which means authentication is done using a username and password.value.converter.schema.registry.basic.auth.user.info
: Provide the schema registry credentials in the formatSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD
. These values should also be retrieved from the prerequisite step.
Setup a MongoDB sink connector with Aiven Console
The following example demonstrates how to setup a MongoDB sink connector for Apache Kafka using the Aiven Console.
Define a Kafka Connect configuration file
Define the connector configurations in a file (we'll refer to it with
the name mongodb_sink.json
) with the following content, creating a
file is not strictly necessary but allows to have all the information in
one place before copy/pasting them in the Aiven
Console:
{
"name":"CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"topics": "TOPIC_LIST",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DATABASE_NAME",
"connect.mongo.kcql": "KCQL_TRANSFORMATION",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
The configuration file contains the following entries:
name
: the connector name, replaceCONNECTOR_NAME
with the name to give To the connector.connect.mongo.connection
: sink parameters collected in the prerequisite phase.key.converter
andvalue.converter
: defines the messages data format in the Apache Kafka topic. Theio.confluent.connect.avro.AvroConverter
converter translates messages from the Avro format. To retrieve the messages schema we use Aiven's Karapace schema registry as specified by theschema.registry.url
parameter and related credentials.
The key.converter
and value.converter
sections define how the topic
messages will be parsed and needs to be included in the connector
configuration.
When using Avro as source data format, set following parameters:
value.converter.schema.registry.url
: pointing to the Aiven for Apache Kafka schema registry URL in the form ofhttps://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT
with theAPACHE_KAFKA_HOST
andSCHEMA_REGISTRY_PORT
parameters retrieved in the previous step.value.converter.basic.auth.credentials.source
: to the valueUSER_INFO
, since you're going to login to the schema registry using username and password.value.converter.schema.registry.basic.auth.user.info
: passing the required schema registry credentials in the form ofSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD
with theSCHEMA_REGISTRY_USER
andSCHEMA_REGISTRY_PASSWORD
parameters retrieved in the previous step.
Create the connector
- Console
- CLI
-
Go to the Aiven Console.
-
Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
-
Click Connectors.
-
Click Create connector if Kafka Connect is enabled on the service. If not, click Enable connector on this service.
To enable connectors:
- Click Service settings in the sidebar.
- In the Service management section, click Actions > Enable Kafka Connect.
-
From the list of sink connectors, select Stream Reactor MongoDB Sink and click Get started.
-
On the Common tab, go to the Connector configuration text box and click Edit.
-
Paste the contents of your
mongodb_sink.json
configuration file into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
-
Confirm that data appears in the target MongoDB collection. The collection name corresponds to the Kafka topic name defined in the KCQL statement.
To create the connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @mongodb_sink.json
Replace:
SERVICE_NAME
: Your Kafka or Kafka Connect service name.@mongodb_sink.json
: Path to your configuration file.
Sink topic data to MongoDB
The following examples show how to sink data from a Kafka topic to a MongoDB collection.
Insert mode
If the Kafka topic students
contains the following records:
{"name": "carlo", "age": 77}
{"name": "lucy", "age": 55}
{"name": "carlo", "age": 33}
Use the following connector configuration to insert each record into a MongoDB
collection named studentscol
:
{
"name": "my-mongodb-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DB_NAME",
"topics": "students",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"connect.mongo.kcql": "INSERT INTO studentscol SELECT * FROM students"
}
Replace all placeholders (such as MONGODB_HOST
, MONGODB_DB_NAME
, and
MONGODB_USERNAME
) with your actual MongoDB connection details.
This configuration does the following:
"topics": "students"
: Specifies the Kafka topic to sink.- Connection settings (
connect.mongo.*
): Provide the MongoDB host, port, credentials, and database name. "value.converter"
and"value.converter.schemas.enable"
: Set the message format. The topic uses raw JSON without a schema."connect.mongo.kcql"
: Defines the insert logic. Each Kafka message is written as a new document in thestudentscol
MongoDB collection.
After creating the connector, check the MongoDB collection to verify that the data
has been inserted. You should see three documents in the studentscol
collection.
Upsert mode
To ensure only one document per unique name is stored, use upsert mode. If
the students
topic contains:
{"name": "carlo", "age": 77}
{"name": "lucy", "age": 55}
{"name": "carlo", "age": 33}
Use the following connector configuration:
{
"name": "my-mongodb-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DB_NAME",
"topics": "students",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"connect.mongo.kcql": "UPSERT INTO studentscol SELECT * FROM students PK name"
}
This configuration performs the following:
- Uses the same connection and converter settings as the insert mode example.
"connect.mongo.kcql"
: UsesUPSERT
logic withPK name
to update the document based on thename
field.
After the connector runs, the studentscol
collection contains two documents,
because the record with "name": "carlo"
is upserted:
{"name": "lucy", "age": 55}
{"name": "carlo", "age": 33}