Create a source connector from MongoDB to Apache Kafka®
The MongoDB source connector periodically queries MongoDB collections and copies the new documents to Apache Kafka® where they can be transformed and read by multiple consumers.
The MongoDB source connector uses change streams to capture changes in MongoDB data at set intervals. Rather than directly polling the collection, the connector pulls new changes from a change stream using a query-based approach. You can set the polling interval as a parameter to determine how often changes are emitted from the stream. For a log-based change data capture method, use the Debezium source connector for MongoDB instead.
See the full set of available parameters and configuration options in the connector's documentation.
Prerequisites
To set up a MongoDB source connector, you need an Aiven for Apache Kafka service with Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
The connector will write to a topic named DATABASE.COLLECTION
so
either create the topic in your Kafka service, or enable the
auto_create_topic
parameter so that the topic will be created
automatically.
Also collect the following information about the source MongoDB database upfront:
MONGODB_CONNECTION_URI
: The MongoDB database connection URL in the formatmongodb://USERNAME:PASSWORD@HOST:PORT
where:USERNAME
: The database username to connectPASSWORD
: The password for the username selectedHOST
: the MongoDB hostnamePORT
: the MongoDB port
MONGODB_DATABASE_NAME
: The name of the MongoDB databaseMONGODB_COLLECTION_NAME
: The name of the MongoDB collection
The complete list of parameters and customization options is available in the MongoDB dedicated documentation.
Setup a MongoDB source connector with Aiven Console
The following example demonstrates how to setup an Apache Kafka MongoDB source connector using the Aiven Console.
Define a Kafka Connect configuration file
Define the connector configurations in a file (we'll refer to it with
the name mongodb_source.json
) with the following content, creating a
file is not strictly necessary but allows to have all the information in
one place before copy/pasting them in the Aiven
Console:
{
"name":"CONNECTOR_NAME",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "MONGODB_CONNECTION_URI",
"database": "MONGODB_DATABASE_NAME",
"collection": "MONGODB_COLLECTION_NAME",
"poll.await.time.ms": "POLL_INTERVAL",
"output.format.value": "VALUE_OUTPUT_FORMAT",
"output.format.key": "KEY_OUTPUT_FORMAT",
"publish.full.document.only": "true"
}
The configuration file contains the following entries:
name
: the connector name, replaceCONNECTOR_NAME
with the name to give to the connector.connection.uri
,database
,collection
: source database parameters collected in the prerequisite phase.poll.await.time.ms
: polling period, time between two queries to the collection, default 5000 milliseconds.output.format.value
andoutput.format.key
: the output format of the data produced by the connector for the key/value. Supported formats are:json
: Raw JSON stringsbson
: Binary JavaScript Object Notation byte arrayschema
: Avro schema output, using this option an additional parameter (output.schema.key
oroutput.schema.value
) needs to be passed defining the documents schema
publish.full.document.only
: only publishes the actual document rather than the full change stream document including additional metadata. Defaults tofalse
.
See the dedicated documentation for the full list of parameters.
Create a Kafka Connect connector with the Aiven Console
To create a Kafka Connect connector:
-
Log in to the Aiven Console and select the Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service where the connector needs to be defined.
-
Select Connectors from the left sidebar.
-
Select Create New Connector, it is enabled only for services with Kafka Connect enabled.
-
Select MongoDB Kafka Source Connector.
-
In the Common tab, locate the Connector configuration text box and select on Edit.
-
Paste the connector configuration (stored in the
mongodb_source.json
file) in the form. -
Select Apply.
noteThe Aiven Console parses the configuration file and fills the relevant UI fields. You can review the UI fields across the various tab and change them if necessary. The changes will be reflected in JSON format in the Connector configuration text box.
-
After all the settings are correctly configured, select Create connector.
-
Verify the connector status under the Connectors screen.
-
Verify the presence of the data in the target Apache Kafka topic, the topic name is equal to the concatenation of MongoDB database and collection names
You can also create connectors using the Aiven CLI command.
Example: Create a MongoDB source connector
If you have in MongoDB a collection named students
in a database named
districtA
containing the following data that you want to move to
Apache Kafka:
{"name":"carlo", "age": 77}
{"name":"lucy", "age": 55}
{"name":"carlo", "age": 33}
You can create a source connector taking the students
MongoDB
collection to Apache Kafka with the following connector configuration,
after replacing the placeholders for MONGODB_HOST
, MONGODB_PORT
,
MONGODB_DB_NAME
, MONGODB_USERNAME
and MONGODB_PASSWORD
:
{
"name": "my-mongodb-source",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://MONGODB_USERNAME:MONGODB_PASSWORD@MONGODB_HOST:MONGODB_PORT",
"database": "MONGODB_DB_NAME",
"collection": "students",
"output.format.key": "json",
"output.format.value": "json",
"output.schema.infer.value": "true",
"poll.await.time.ms": "1000"
}
The configuration file contains the following peculiarities:
"collection": "students"
: setting the collection to source."database": "MONGODB_DB_NAME"
: the database used is the one referenced by the placeholderMONGODB_DB_NAME
."output.format.key"
and"output.format.value"
: are both set to produce messages in JSON format."output.schema.infer.value": "true"
: the schema is automatically inferred."poll.await.time.ms": "1000"
: One second polling time
Once the connector is created successfully, you should see a topic named
MONGODB_DB_NAME.students
in Aiven for Apache Kafka.