Create a MongoDB sink connector (Lenses.io) for Aiven for Apache Kafka®
Use the MongoDB sink connector by Lenses.io to write data from Apache Kafka® topics into a MongoDB database. This connector supports KCQL transformations to filter and map topic data before inserting it into MongoDB.
Aiven supports two MongoDB sink connectors with different capabilities:
Version compatibility
Stream Reactor version 9.0.2 includes class and package name updates introduced in version 6.0.0 by Lenses to standardize connector and converter names.
Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before you upgrade.
If you upgrade from version 4.2.0, recreate the connector using the updated class name. For example:
"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"
For details about these changes, see the Stream Reactor release notes.
Prerequisites
- An Aiven for Apache Kafka® service with Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service
- A MongoDB database with accessible connection credentials
- The following MongoDB and Kafka connection details:
-
MONGODB_USERNAME: MongoDB username -
MONGODB_PASSWORD: MongoDB password -
MONGODB_HOST: MongoDB hostname -
MONGODB_PORT: MongoDB port -
MONGODB_DATABASE_NAME: MongoDB database name -
TOPIC_LIST: Comma-separated list of Kafka topics to sink -
KCQL_TRANSFORMATION: KCQL mapping statement. For example:INSERT INTO MONGODB_COLLECTION
SELECT * FROM APACHE_KAFKA_TOPIC -
Schema Registry details (required only for Avro format):
APACHE_KAFKA_HOST: Kafka hostSCHEMA_REGISTRY_PORT: Schema Registry portSCHEMA_REGISTRY_USER: Schema Registry usernameSCHEMA_REGISTRY_PASSWORD: Schema Registry password
-
- Access to one of the following setup methods:
- Authentication configured for your Aiven project
(for example, set the
AIVEN_API_TOKENenvironment variable if using CLI or Terraform)
Create a MongoDB sink connector configuration file
Create a file named mongodb_sink.json with the following configuration:
{
"name": "mongodb-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"topics": "TOPIC_LIST",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DATABASE_NAME",
"connect.mongo.kcql": "KCQL_TRANSFORMATION",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}
Parameters:
name: Connector nameconnector.class: Class name of the MongoDB sink connectorconnect.mongo.connection: MongoDB connection URI with credentialsconnect.mongo.db: MongoDB database nameconnect.mongo.kcql: KCQL statement defining how Kafka topic data maps to MongoDBkey.converterandvalue.converter: Configure the data format and schema registry detailstopics: Kafka topics to sink data from
The key.converter and value.converter fields define how Kafka messages are parsed
and must be included in the configuration.
When using Avro as the source format, configure the following parameters:
value.converter.schema.registry.url: Schema Registry URL in the formathttps://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT. Retrieve these values from the prerequisites.value.converter.basic.auth.credentials.source: Set toUSER_INFOto authenticate with a username and password.value.converter.schema.registry.basic.auth.user.info: Schema Registry credentials in the formatSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD. Retrieve these values from the prerequisites.
For advanced configurations such as batch size, write strategy, or custom transformations, see the Lenses.io MongoDB sink connector reference.
Create the connector
- Console
- CLI
- Go to the Aiven Console.
- Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
- Click Connectors.
- Click Create connector if Kafka Connect is enabled on the service. If not, enable it under Service settings > Actions > Enable Kafka Connect.
- From the list of sink connectors, select Stream Reactor MongoDB Sink, and click Get started.
- In the Common tab, find the Connector configuration text box and click Edit.
- Paste the configuration from your
mongodb_sink.jsonfile. - Click Create connector.
- Verify the connector status on the Connectors page.
- Confirm that data appears in the target MongoDB collection. The collection name corresponds to the KCQL mapping.
To create the connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @mongodb_sink.json
Replace:
SERVICE_NAME: Name of your Aiven for Apache Kafka or Kafka Connect service@mongodb_sink.json: Path to your configuration file
Example: Sink data to MongoDB
The following examples show how to write Kafka topic data to MongoDB collections using KCQL transformations.
Insert mode
If the Kafka topic students contains:
{"name": "carlo", "age": 77}
{"name": "lucy", "age": 55}
{"name": "carlo", "age": 33}
Use this configuration to insert all records into a MongoDB collection named studentscol:
{
"name": "mongodb-sink-insert",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"topics": "students",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DATABASE_NAME",
"connect.mongo.kcql": "INSERT INTO studentscol SELECT * FROM students",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
Upsert mode
To ensure only one document per unique name is stored, use upsert mode:
{
"name": "mongodb-sink-upsert",
"connector.class": "com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector",
"topics": "students",
"connect.mongo.connection": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"connect.mongo.db": "MONGODB_DATABASE_NAME",
"connect.mongo.kcql": "UPSERT INTO studentscol SELECT * FROM students PK name",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
This configuration updates existing records based on the name field instead of
inserting duplicates.
After the connector runs, the MongoDB collection studentscol contains:
{"name": "lucy", "age": 55}
{"name": "carlo", "age": 33}
Related pages