Skip to main content

Create a sink connector from Apache Kafka® to InfluxDB®

The InfluxDB® Stream Reactor sink connector enables you to move data from an Aiven for Apache Kafka® cluster to an InfluxDB® instance. It uses KCQL transformations to filter and map topic data before writing it to InfluxDB.

caution

Version compatibility

Stream Reactor version 9.0.2 includes class and package name changes introduced in version 6.0.0 by Lenses. These changes standardize connector class names and converter package names.

Version 9.x is not compatible with version 4.2.0. To continue using version 4.2.0, set the connector version before upgrading.

If you are upgrading from version 4.2.0, you must recreate the connector using the updated class name. For example:

"connector.class": "io.lenses.streamreactor.connect.<connector_path>.<ConnectorClassName>"

For details about the changes, see the Stream Reactor release notes.

Prerequisites

  • An Aiven for Apache Kafka service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.

  • Gather the following information for the target InfluxDB database:

  • INFLUXDB_HOST: The InfluxDB hostname.

  • INFLUXDB_PORT: The InfluxDB port.

  • INFLUXDB_DATABASE_NAME: The InfluxDB database name.

  • INFLUXDB_USERNAME: The InfluxDB username.

  • INFLUXDB_PASSWORD: The InfluxDB password.

  • TOPIC_LIST: A comma-separated list of Kafka topics to sink.

  • KCQL_TRANSFORMATION: A KCQL statement to map topic fields to InfluxDB measurements. Use the following format:

    INSERT INTO MEASUREMENT_NAME
    SELECT LIST_OF_FIELDS
    FROM APACHE_KAFKA_TOPIC
  • APACHE_KAFKA_HOST: The Apache Kafka host. Required only when using Avro as the data format.

  • SCHEMA_REGISTRY_PORT: The schema registry port. Required only when using Avro.

  • SCHEMA_REGISTRY_USER: The schema registry username. Required only when using Avro.

  • SCHEMA_REGISTRY_PASSWORD: The schema registry password. Required only when using Avro.

    note

    If you are using Aiven for InfluxDB and Aiven for Apache Kafka, get all required connection details, including schema registry information, from the Connection information section on the Overview page.

    As of version 3.0, Aiven for Apache Kafka uses Karapace as the schema registry and no longer supports the Confluent Schema Registry.

For a complete list of supported parameters and configuration options, see the connector's documentation.

Create a connector configuration file

Create a file named influxdb_sink.json and add the following configuration:

{
"name": "CONNECTOR_NAME",
"connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
"topics": "TOPIC_LIST",
"connect.influx.url": "https://INFLUXDB_HOST:INFLUXDB_PORT",
"connect.influx.db": "INFLUXDB_DATABASE_NAME",
"connect.influx.username": "INFLUXDB_USERNAME",
"connect.influx.password": "INFLUXDB_PASSWORD",
"connect.influx.kcql": "KCQL_TRANSFORMATION",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD"
}

Parameters:

  • name: The connector name. Replace CONNECTOR_NAME with your desired name.
  • connect.influx.*: InfluxDB connection parameters collected in the prerequisite step.
  • topics: A comma-separated list of Kafka topics to sink.
  • key.converter and value.converter: Define the message data format in the Kafka topic. This example uses io.confluent.connect.avro.AvroConverter to translate messages in Avro format. The schema is retrieved from Aiven's Karapace schema registry using the schema.registry.url and related credentials.
note

The key.converter and value.converter fields define how Kafka messages are parsed. Include these fields in the connector configuration.

If you use Avro as the message format, set the following parameters:

  • value.converter.schema.registry.url: The Aiven for Apache Kafka schema registry URL, in the format https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT.
  • value.converter.basic.auth.credentials.source: Set to USER_INFO to enable authentication with a username and password.
  • value.converter.schema.registry.basic.auth.user.info: The schema registry credentials, in the format SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD.

You can get these values from the prerequisite step.

Create the connector

  1. Access the Aiven Console.

  2. Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.

  3. Click Connectors.

  4. Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.

    Alternatively, to enable connectors:

    1. Click Service settings in the sidebar.
    2. In the Service management section, click Actions > Enable Kafka connect.
  5. In the sink connectors list, select Stream Reactor InfluxDB Sink, and click Get started.

  6. On the Stream Reactor InfluxDB Sink page, go to the Common tab.

  7. Locate the Connector configuration text box and click Edit.

  8. Paste the configuration from your influxdb_sink.json file into the text box.

  9. Click Create connector.

  10. Verify the connector status on the Connectors page.

  11. Confirm that data is written to the InfluxDB target database.

Sink topic data to InfluxDB

The following example shows how to sink data from a Kafka topic to an InfluxDB measurement. If your Kafka topic measurements contains the following data:

{
"ts": "2022-10-24T13:09:43.406000Z",
"device_name": "mydevice1",
"measurement": 17
}

To write this data to InfluxDB, use the following connector configuration:

{
"name": "my-influxdb-sink",
"connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
"topics": "measurements",
"connect.influx.url": "https://INFLUXDB_HOST:INFLUXDB_PORT",
"connect.influx.db": "INFLUXDB_DATABASE_NAME",
"connect.influx.username": "INFLUXDB_USERNAME",
"connect.influx.password": "INFLUXDB_PASSWORD",
"connect.influx.kcql": "INSERT INTO measurements SELECT ts, device_name, measurement FROM measurements",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}

Replace all placeholder values (such as INFLUXDB_HOST, INFLUXDB_PORT, and INFLUXDB_PASSWORD) with your actual InfluxDB connection details.

This configuration does the following:

  • "topics": "measurements": Specifies the Kafka topic to sink.
  • Connection settings (connect.influx.*): Provide the InfluxDB connection details.
  • "value.converter" and "value.converter.schemas.enable": Set the message format. The topic uses raw JSON without a schema.
  • "connect.influx.kcql": Defines the insert logic. Each Kafka message is written as a row in the measurements table.

After creating the connector, verify the presence of the data in the target InfluxDB database. The measurement name matches the Kafka topic (measurements).