Skip to main content

Create an Amazon S3 sink connector by Confluent from Apache Kafka®

The Apache Kafka Connect® S3 sink connector by Aiven enables you to move data from an Aiven for Apache Kafka® cluster to Amazon S3 for long term storage.

note
  • There are two versions of S3 sink connector available with Aiven for Apache Kafka Connect®: an Aiven version and a Confluent version. This article uses the Confluent version. To use the S3 sink connector by Aiven see Amazon S3 sink connector by Aiven.

  • See the full set of available parameters and configuration options in the connector's documentation.

Prerequisites

Set up an S3 sink connector with Aiven CLI

The following example demonstrates how to setup an Apache Kafka Connect® S3 sink connector using the Aiven CLI dedicated command.

Define a Kafka Connect® configuration file

Define the connector configurations in a file (we'll refer to it with the name s3_sink.json) with the following content:

{
"name": "<CONNECTOR_NAME>",
"topics": "<TOPIC_NAME>",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "1",
"s3.bucket.name": "<AWS_S3_NAME>",
"s3.region": "<AWS_S3_REGION>",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.secret_access_key": "<AWS_USER_SECRET_ACCESS_KEY>",
"s3.credentials.provider.access_key_id": "<AWS_USER_ACCESS_KEY_ID>"
}

The configuration file contains the following entries:

  • name: The connector name
  • topics: The list of Apache Kafka® topics to sink to the S3 bucket
  • key.converter and value.converter: Data converters, depending on the topic data format. Check the related documentation for more information
  • format.class: Defines the output data format in the S3 bucket. The io.confluent.connect.s3.format.bytearray.ByteArrayFormat writes messages in binary format.
  • flush.size: Defines how many messages to write per file in the S3 bucket. for example, setting flush.size to 3 generates a file every three messages in a topic and partition.
  • s3.bucket.name: The name of the S3 bucket
  • s3.region: The AWS region where the S3 bucket has been created
  • s3.credentials.provider.class: The name of the class implementing the com.amazonaws.auth.AWSCredentialsProvider and org.apache.kafka.common.Configurable interfaces. Use io.aiven.kafka.connect.util.AivenAWSCredentialsProvider.
  • s3.credentials.provider.secret_access_key: The AWS user secret access key
  • s3.credentials.provider.access_key_id: The AWS user access key ID

See the dedicated documentation for the full list of configuration options.

Create an S3 sink connector with Aiven CLI

To create the connector, execute the following Aiven CLI command, replacing the SERVICE_NAME with the name of the existing Aiven for Apache Kafka® service where the connector needs to run:

avn service connector create SERVICE_NAME @s3_sink.json

Check the connector status with the following command, replacing the SERVICE_NAME with the existing Aiven for Apache Kafka® service and the CONNECTOR_NAME with the name of the connector defined before:

avn service connector status SERVICE_NAME CONNECTOR_NAME

With the connection in place, verify that the data is flowing to the target S3 bucket.

Example: define a S3 sink connector

The example creates an S3 sink connector with the following properties:

  • connector name: my_s3_sink
  • source topics: students
  • target S3 bucket name: my-test-bucket
  • target S3 bucket region: eu-central-1
  • AWS user access key id: AKIAXXXXXXXXXX
  • AWS user secret access key: hELuXXXXXXXXXXXXXXXXXXXXXXXXXX
  • generating a file in the S3 bucket every 10 messages

The connector configuration is the following:

{
"name": "my_s3_sink",
"topics": "students",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "10",
"s3.bucket.name": "my-test-bucket",
"s3.region": "eu-central-1",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.secret_access_key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX",
"s3.credentials.provider.access_key_id": "AKIAXXXXXXXXXX"
}

With the above configuration stored in a s3_sink.json file, you can create the connector in the demo-kafka instance with:

avn service connector create demo-kafka @s3_sink.json