Create a source connector from MQTT to Apache Kafka®
The Stream Reactor MQTT source connector transfers messages from an MQTT topic to an Aiven for Apache Kafka® topic, where they can be processed and consumed by multiple applications.
It creates a queue and binds it to the amq.topic
exchange specified in the KCQL
statement, then forwards the messages to Kafka.
You can use this connector to source messages from RabbitMQ® if the RabbitMQ MQTT plugin is enabled.
Version compatibility
Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.
Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.
If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:
"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"
For details about the changes, see the Stream Reactor release notes.
Prerequisites
-
An Aiven for Apache Kafka service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
-
Collect the following details for your MQTT source:
-
HOST
: The MQTT hostname. -
PORT
: The MQTT port (usually1883
). -
USERNAME
: The MQTT username. -
PASSWORD
: The MQTT password. -
KCQL_STATEMENT
: A KCQL mapping from MQTT to Kafka in the format:INSERT INTO <your-kafka-topic> SELECT * FROM <your-mqtt-topic>
-
APACHE_KAFKA_HOST
: The Kafka host (required only when using Avro). -
SCHEMA_REGISTRY_PORT
: The schema registry port (required only when using Avro). -
SCHEMA_REGISTRY_USER
: The schema registry username (required only when using Avro). -
SCHEMA_REGISTRY_PASSWORD
: The schema registry password (required only when using Avro).
-
The connector writes to a Kafka topic defined in the connect.mqtt.kcql
parameter.
Make sure the topic exists, or enable automatic topic creation.
Create a connector configuration file
Create a file named mqtt_source.json
and add the following configuration:
{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.hosts": "tcp://HOST:PORT",
"connect.mqtt.kcql": "KCQL_STATEMENT",
"connect.mqtt.username": "USERNAME",
"connect.mqtt.password": "PASSWORD",
"connect.mqtt.service.quality": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
Parameters:
name
: The connector name. ReplaceCONNECTOR_NAME
with your desired name.connect.mqtt.*
: MQTT connection details collected in the prerequisite step.connect.mqtt.kcql
: A KCQL statement that maps MQTT topics to Kafka topics.connect.mqtt.service.quality
: Sets the MQTT Quality of Service (QoS). Common values are0
,1
, or2
.key.converter
andvalue.converter
: Set the message format. This example uses raw JSON.
Create the connector
- Console
- CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
-
Click Connectors.
-
Click Create connector. If Kafka Connect is not yet enabled, click Enable connector on this service.
Alternatively:
- Go to Service settings.
- In the Service management section, click Actions > Enable Kafka connect.
-
From the source connectors list, select Stream Reactor MQTT Source, then click Get started.
-
On the Common tab, locate the Connector configuration text box and click Edit.
-
Paste the contents of your
mqtt_source.json
file. -
Click Create connector.
-
Verify the connector status on the Connectors page.
To create the connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @mqtt_source.json
Replace:
SERVICE_NAME
: Your Kafka or Kafka Connect service name.@mqtt_source.json
: Path to your configuration file.
Source MQTT data to a Kafka topic
The following example shows how to forward messages from an MQTT topic to a Kafka topic.
Suppose your MQTT broker publishes JSON messages to a topic named devices/status
like this:
{"device": "alpha", "status": "online"}
{"device": "beta", "status": "offline"}
To send this data to a Kafka topic named device_status
, use the following connector
configuration:
{
"name": "mqtt-source-example",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.hosts": "tcp://mqtt-broker.example.com:1883",
"connect.mqtt.kcql": "INSERT INTO device_status SELECT * FROM devices/status",
"connect.mqtt.username": "mqtt_user",
"connect.mqtt.password": "mqtt_password",
"connect.mqtt.service.quality": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
Replace all placeholder values (such as mqtt-broker.example.com
, mqtt_user
,
and mqtt_password
) with your actual connection details.
This configuration does the following:
connect.mqtt.kcql
: Routes messages from the MQTT topicdevices/status
to the Kafka topicdevice_status
.connect.mqtt.service.quality
: Uses QoS level1
for at-least-once delivery.key.converter
andvalue.converter
: Set the message format to JSON.connect.mqtt.*
: Supplies the MQTT connection details.
After the connector is running, check the device_status
Kafka topic to confirm that
messages are being delivered.