Skip to main content

Create a source connector from MQTT to Apache Kafka®

The Stream Reactor MQTT source connector transfers messages from an MQTT topic to an Aiven for Apache Kafka® topic, where they can be processed and consumed by multiple applications. It creates a queue and binds it to the amq.topic exchange specified in the KCQL statement, then forwards the messages to Kafka.

tip

You can use this connector to source messages from RabbitMQ® if the RabbitMQ MQTT plugin is enabled.

caution

Version compatibility

Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.

Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.

If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:

"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"

For details about the changes, see the Stream Reactor release notes.

Prerequisites

  • An Aiven for Apache Kafka service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.

  • Collect the following details for your MQTT source:

    • HOST: The MQTT hostname.

    • PORT: The MQTT port (usually 1883).

    • USERNAME: The MQTT username.

    • PASSWORD: The MQTT password.

    • KCQL_STATEMENT: A KCQL mapping from MQTT to Kafka in the format: INSERT INTO <your-kafka-topic> SELECT * FROM <your-mqtt-topic>

    • APACHE_KAFKA_HOST: The Kafka host (required only when using Avro).

    • SCHEMA_REGISTRY_PORT: The schema registry port (required only when using Avro).

    • SCHEMA_REGISTRY_USER: The schema registry username (required only when using Avro).

    • SCHEMA_REGISTRY_PASSWORD: The schema registry password (required only when using Avro).

tip

The connector writes to a Kafka topic defined in the connect.mqtt.kcql parameter. Make sure the topic exists, or enable automatic topic creation.

Create a connector configuration file

Create a file named mqtt_source.json and add the following configuration:

{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.hosts": "tcp://HOST:PORT",
"connect.mqtt.kcql": "KCQL_STATEMENT",
"connect.mqtt.username": "USERNAME",
"connect.mqtt.password": "PASSWORD",
"connect.mqtt.service.quality": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}

Parameters:

  • name: The connector name. Replace CONNECTOR_NAME with your desired name.
  • connect.mqtt.*: MQTT connection details collected in the prerequisite step.
  • connect.mqtt.kcql: A KCQL statement that maps MQTT topics to Kafka topics.
  • connect.mqtt.service.quality: Sets the MQTT Quality of Service (QoS). Common values are 0, 1, or 2.
  • key.converter and value.converter: Set the message format. This example uses raw JSON.

Create the connector

  1. Access the Aiven Console.

  2. Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.

  3. Click Connectors.

  4. Click Create connector. If Kafka Connect is not yet enabled, click Enable connector on this service.

    Alternatively:

    • Go to Service settings.
    • In the Service management section, click Actions > Enable Kafka connect.
  5. From the source connectors list, select Stream Reactor MQTT Source, then click Get started.

  6. On the Common tab, locate the Connector configuration text box and click Edit.

  7. Paste the contents of your mqtt_source.json file.

  8. Click Create connector.

  9. Verify the connector status on the Connectors page.

Source MQTT data to a Kafka topic

The following example shows how to forward messages from an MQTT topic to a Kafka topic.

Suppose your MQTT broker publishes JSON messages to a topic named devices/status like this:

{"device": "alpha", "status": "online"}
{"device": "beta", "status": "offline"}

To send this data to a Kafka topic named device_status, use the following connector configuration:

{
"name": "mqtt-source-example",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"connect.mqtt.hosts": "tcp://mqtt-broker.example.com:1883",
"connect.mqtt.kcql": "INSERT INTO device_status SELECT * FROM devices/status",
"connect.mqtt.username": "mqtt_user",
"connect.mqtt.password": "mqtt_password",
"connect.mqtt.service.quality": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}

Replace all placeholder values (such as mqtt-broker.example.com, mqtt_user, and mqtt_password) with your actual connection details.

This configuration does the following:

  • connect.mqtt.kcql: Routes messages from the MQTT topic devices/status to the Kafka topic device_status.
  • connect.mqtt.service.quality: Uses QoS level 1 for at-least-once delivery.
  • key.converter and value.converter: Set the message format to JSON.
  • connect.mqtt.*: Supplies the MQTT connection details.

After the connector is running, check the device_status Kafka topic to confirm that messages are being delivered.