Create an MQTT sink connector
The MQTT sink connector copies messages from an Apache Kafka® topic to an MQTT queue.
You can use this connector to send messages to RabbitMQ® when the RabbitMQ MQTT plugin is enabled.
Version compatibility
Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.
Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.
If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:
"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"
For details about the changes, see the Stream Reactor release notes.
Prerequisites
-
An Aiven for Apache Kafka service with Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
-
Gather the following information for the target MQTT server:
-
USERNAME
: The MQTT username. -
PASSWORD
: The MQTT password. -
HOST
: The MQTT hostname. -
PORT
: The MQTT port (typically1883
). -
KCQL_STATEMENT
: A KCQL statement that maps topic data to the MQTT topic. Use the following format:INSERT INTO MQTT_TOPIC
SELECT LIST_OF_FIELDS
FROM APACHE_KAFKA_TOPIC -
APACHE_KAFKA_HOST
: The Apache Kafka host. Required only when using Avro. -
SCHEMA_REGISTRY_PORT
: The schema registry port. Required only when using Avro. -
SCHEMA_REGISTRY_USER
: The schema registry username. Required only when using Avro. -
SCHEMA_REGISTRY_PASSWORD
: The schema registry password. Required only when using Avro.
-
The connector writes to the Kafka topic defined in the connect.mqtt.kcql
parameter.
Either create the topic manually or enable the auto_create_topic
parameter to allow
automatic topic creation.
For a complete list of parameters and configuration options, see the connector documentation.
Create the connector configuration file
Create a file named mqtt_sink.json
and add the following configuration:
{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector",
"connect.mqtt.hosts": "tcp://HOST:PORT",
"connect.mqtt.kcql": "KCQL_STATEMENT",
"connect.mqtt.username": "USERNAME",
"connect.mqtt.password": "PASSWORD",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
Parameters:
name
: The connector name. ReplaceCONNECTOR_NAME
with your desired name.connect.mqtt.*
: MQTT server connection parameters collected in the prerequisite step.key.converter
andvalue.converter
: Define the message data format in the Kafka topic. This example usesJsonConverter
for both key and value.
For a full list of supported parameters, see the Stream Reactor MQTT sink documentation.
Create the connector
- Console
- CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
-
Click Connectors.
-
Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
- Click Service settings in the sidebar.
- In the Service management section, click Actions > Enable Kafka connect.
-
In the sink connectors list, select Stream Reactor MQTT Sink Connector, and click Get started.
-
On the Stream Reactor MQTT Sink page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
mqtt_sink.json
file into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
-
Confirm that data is delivered to the MQTT topic defined in the
KCQL_STATEMENT
.
To create the connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @mqtt_sink.json
Replace:
SERVICE_NAME
: Your Kafka or Kafka Connect service name.@mqtt_sink.json
: Path to your connector configuration file.
Sink topic data to an MQTT topic
The following example shows how to sink data from a Kafka topic to an MQTT topic. If your Kafka topic sensor_data
contains the following messages:
{"device":"sensor-1", "temperature": 22.5}
{"device":"sensor-2", "temperature": 19.0}
{"device":"sensor-1", "temperature": 23.1}
To write this data to an MQTT topic named iot/devices/temperature
, use the following
connector configuration:
{
"name": "my-mqtt-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector",
"topics": "sensor_data",
"connect.mqtt.hosts": "tcp://MQTT_HOST:MQTT_PORT",
"connect.mqtt.username": "MQTT_USERNAME",
"connect.mqtt.password": "MQTT_PASSWORD",
"connect.mqtt.kcql": "INSERT INTO iot/devices/temperature SELECT * FROM sensor_data",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
Replace all placeholder values (such as MQTT_HOST
, MQTT_PORT
, and MQTT_USERNAME
)
with your actual MQTT broker connection details.
This configuration does the following:
"topics": "sensor_data"
: Specifies the Kafka topic to sink.connect.mqtt.*
: Sets the MQTT broker hostname, port, credentials, and KCQL rules."value.converter"
and"value.converter.schemas.enable"
: Set the message format. This example uses raw JSON without a schema."connect.mqtt.kcql"
: Defines the KCQL transformation. Each Kafka message is published to the MQTT topiciot/devices/temperature
.
After creating the connector, check your MQTT broker to verify that messages are
published to the iot/devices/temperature
topic.