Skip to main content

Create an MQTT sink connector

The MQTT sink connector copies messages from an Apache Kafka® topic to an MQTT queue.

tip

You can use this connector to send messages to RabbitMQ® when the RabbitMQ MQTT plugin is enabled.

caution

Version compatibility

Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.

Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.

If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:

"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"

For details about the changes, see the Stream Reactor release notes.

Prerequisites

  • An Aiven for Apache Kafka service with Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.

  • Gather the following information for the target MQTT server:

    • USERNAME: The MQTT username.

    • PASSWORD: The MQTT password.

    • HOST: The MQTT hostname.

    • PORT: The MQTT port (typically 1883).

    • KCQL_STATEMENT: A KCQL statement that maps topic data to the MQTT topic. Use the following format:

      INSERT INTO MQTT_TOPIC
      SELECT LIST_OF_FIELDS
      FROM APACHE_KAFKA_TOPIC
    • APACHE_KAFKA_HOST: The Apache Kafka host. Required only when using Avro.

    • SCHEMA_REGISTRY_PORT: The schema registry port. Required only when using Avro.

    • SCHEMA_REGISTRY_USER: The schema registry username. Required only when using Avro.

    • SCHEMA_REGISTRY_PASSWORD: The schema registry password. Required only when using Avro.

note

The connector writes to the Kafka topic defined in the connect.mqtt.kcql parameter. Either create the topic manually or enable the auto_create_topic parameter to allow automatic topic creation.

For a complete list of parameters and configuration options, see the connector documentation.

Create the connector configuration file

Create a file named mqtt_sink.json and add the following configuration:

{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector",
"connect.mqtt.hosts": "tcp://HOST:PORT",
"connect.mqtt.kcql": "KCQL_STATEMENT",
"connect.mqtt.username": "USERNAME",
"connect.mqtt.password": "PASSWORD",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}

Parameters:

  • name: The connector name. Replace CONNECTOR_NAME with your desired name.
  • connect.mqtt.*: MQTT server connection parameters collected in the prerequisite step.
  • key.converter and value.converter: Define the message data format in the Kafka topic. This example uses JsonConverter for both key and value.

For a full list of supported parameters, see the Stream Reactor MQTT sink documentation.

Create the connector

  1. Access the Aiven Console.

  2. Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.

  3. Click Connectors.

  4. Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.

    Alternatively, to enable connectors:

    1. Click Service settings in the sidebar.
    2. In the Service management section, click Actions > Enable Kafka connect.
  5. In the sink connectors list, select Stream Reactor MQTT Sink Connector, and click Get started.

  6. On the Stream Reactor MQTT Sink page, go to the Common tab.

  7. Locate the Connector configuration text box and click Edit.

  8. Paste the configuration from your mqtt_sink.json file into the text box.

  9. Click Create connector.

  10. Verify the connector status on the Connectors page.

  11. Confirm that data is delivered to the MQTT topic defined in the KCQL_STATEMENT.

Sink topic data to an MQTT topic

The following example shows how to sink data from a Kafka topic to an MQTT topic. If your Kafka topic sensor_data contains the following messages:

{"device":"sensor-1", "temperature": 22.5}
{"device":"sensor-2", "temperature": 19.0}
{"device":"sensor-1", "temperature": 23.1}

To write this data to an MQTT topic named iot/devices/temperature, use the following connector configuration:

{
"name": "my-mqtt-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector",
"topics": "sensor_data",
"connect.mqtt.hosts": "tcp://MQTT_HOST:MQTT_PORT",
"connect.mqtt.username": "MQTT_USERNAME",
"connect.mqtt.password": "MQTT_PASSWORD",
"connect.mqtt.kcql": "INSERT INTO iot/devices/temperature SELECT * FROM sensor_data",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}

Replace all placeholder values (such as MQTT_HOST, MQTT_PORT, and MQTT_USERNAME) with your actual MQTT broker connection details.

This configuration does the following:

  • "topics": "sensor_data": Specifies the Kafka topic to sink.
  • connect.mqtt.*: Sets the MQTT broker hostname, port, credentials, and KCQL rules.
  • "value.converter" and "value.converter.schemas.enable": Set the message format. This example uses raw JSON without a schema.
  • "connect.mqtt.kcql": Defines the KCQL transformation. Each Kafka message is published to the MQTT topic iot/devices/temperature.

After creating the connector, check your MQTT broker to verify that messages are published to the iot/devices/temperature topic.