Create a MongoDB source connector for Aiven for Apache Kafka®
Use the MongoDB source connector to stream data from MongoDB collections into Apache Kafka® topics for processing and analytics.
The MongoDB source connector uses change streams to capture and emit changes at defined intervals. Instead of continuously polling the collection, the connector queries the change stream at a configurable interval to detect updates. For a log-based change data capture (CDC) approach, use the Debezium source connector for MongoDB.
Prerequisites
- An Aiven for Apache Kafka® service with Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service
- A MongoDB database and collection with accessible credentials
- The following MongoDB connection details:
connection.uri: Connection URI in the formatmongodb://USERNAME:PASSWORD@HOST:PORTdatabase: Name of the MongoDB databasecollection: Name of the MongoDB collection
- A target Apache Kafka topic where the connector writes the data
- Access to one of the following setup methods:
- Authentication configured for your project
(for example, set the
AIVEN_API_TOKENenvironment variable if using the CLI or Terraform)
The connector writes to a topic named DATABASE.COLLECTION.
Create the topic in advance or enable the auto_create_topic parameter in your Kafka
service.
Create a MongoDB source connector configuration file
Create a file named mongodb_source_config.json with the following configuration:
{
"name": "mongodb-source",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://USERNAME:PASSWORD@HOST:PORT",
"database": "DATABASE_NAME",
"collection": "COLLECTION_NAME",
"poll.await.time.ms": "5000",
"output.format.key": "json",
"output.format.value": "json",
"publish.full.document.only": "true"
}
Parameters:
name: Name of the connectorconnector.class: Class name of the MongoDB source connectorconnection.uri: MongoDB connection URI with authenticationdatabase: Name of the MongoDB database to read fromcollection: Name of the MongoDB collection to stream frompoll.await.time.ms: Interval in milliseconds for polling new changes. Default is5000output.format.keyandoutput.format.value: Format for the key and value of each Kafka record. Supported values:json,bson,schemapublish.full.document.only: Whentrue, only the changed document is published instead of the full change event
Advanced options
For advanced use cases, such as schema inference, document filtering, or topic overrides, you can customize additional parameters. See the MongoDB Kafka connector documentation for the full list of available options.
Create the connector
- Console
- CLI
- Terraform
- Access the Aiven Console.
- Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
- Click Connectors.
- Click Create connector if Kafka Connect is enabled on the service. If not, enable Kafka Connect under Service settings > Actions > Enable Kafka Connect.
- In the source connectors list, select MongoDB source connector, and click Get started.
- In the Common tab, locate the Connector configuration text box and click Edit.
- Paste the configuration from your
mongodb_source_config.jsonfile into the text box. - Click Create connector.
- Verify the connector status on the Connectors page.
- Verify that data appears in the target Kafka topic.
By default, the connector writes to a topic named after the MongoDB database and
collection, for example,
districtA.students.
To create the MongoDB source connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @mongodb_source_config.json
Replace:
SERVICE_NAME: Name of your Aiven for Apache Kafka or Kafka Connect service@mongodb_source_config.json: Path to your JSON configuration file
You can configure this connector using the
aiven_kafka_connector
resource in the Aiven Provider for Terraform.
Example:
resource "aiven_kafka_connector" "mongodb_source_connector" {
project = var.project_name
service_name = aiven_kafka.example_kafka.service_name
connector_name = "mongodb-source-connector"
config = {
"name" = "mongodb-source-connector"
"connector.class" = "com.mongodb.kafka.connect.MongoSourceConnector"
"connection.uri" = var.mongodb_connection_uri
"database" = "sample_airbnb"
"collection" = "listingsAndReviews"
"copy.existing" = "true"
"poll.await.time.ms" = "1000"
"output.format.value" = "json"
"output.format.key" = "json"
"publish.full.document.only" = "true"
}
}
Define variables such as project_name and mongodb_connection_uri in your Terraform configuration.
Example: Create a MongoDB source connector
The following example shows how to create a MongoDB source connector that reads data
from the students collection in the districtA database and writes it to a Kafka
topic named districtA.students.
MongoDB collection (students):
{"name": "carlo", "age": 77}
{"name": "lucy", "age": 55}
{"name": "carlo", "age": 33}
Connector configuration:
{
"name": "mongodb-source-students",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://USERNAME:PASSWORD@HOST:PORT",
"database": "districtA",
"collection": "students",
"output.format.key": "json",
"output.format.value": "json",
"output.schema.infer.value": "true",
"poll.await.time.ms": "1000"
}
This configuration streams data from the students collection to the Kafka
topic districtA.students every second, based on the polling interval (poll.await.time.ms).
Verify data flow
After you create the connector:
- Check the connector status on the Connectors page in the Aiven Console.
- Confirm that the Kafka topic
districtA.studentsexists in your service. - Consume messages from the topic to verify that data from MongoDB is streaming correctly.
Related pages