Skip to main content

Create an Amazon S3 sink connector by Confluent from Apache Kafka®

The Amazon S3 sink connector by Confluent moves data from Apache Kafka® topics to Amazon S3 buckets for long-term storage.

For the full list of configuration options, see the Confluent S3 sink connector documentation.

Prerequisites

Create the connector configuration file

Create a file named s3_sink_confluent.json with the following configuration:

{
"name": "s3_sink_confluent",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "1",
"s3.bucket.name": "my-bucket",
"s3.region": "eu-central-1",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.access_key_id": "<AWS_USER_ACCESS_KEY_ID>",
"s3.credentials.provider.secret_access_key": "<AWS_USER_SECRET_ACCESS_KEY>"
}

Parameters:

  • connector.class: Uses the Confluent S3 sink connector
  • topics: Kafka topics to sink
  • format.class: Output format (byte array in this example)
  • flush.size: Number of messages written per file in S3
  • s3.bucket.name / s3.region: Destination S3 bucket
  • s3.credentials.provider.class: Use Aiven provider to supply AWS credentials

For all options, see the Confluent connector documentation.

Create the connector

  1. Open the Aiven Console.
  2. Select your Kafka or Kafka Connect service.
  3. Click Connectors.
  4. Click Create connector.
  5. Search for Amazon S3 sink (Confluent) and click Get started.
  6. On the connector page, go to the Common tab.
  7. Click Edit in Connector configuration.
  8. Paste your s3_sink_confluent.json contents.
  9. Click Create connector.
  10. Verify the connector status on the Connectors page.

Example: create an Amazon S3 sink connector (Confluent)

This example creates an S3 sink connector that writes data from the students topic to the my-test-bucket bucket in the eu-central-1 region, writing a file every 10 messages.

{
"name": "my_s3_sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "students",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "10",
"s3.bucket.name": "my-test-bucket",
"s3.region": "eu-central-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.access_key_id": "AKIAXXXXXXXXXX",
"s3.credentials.provider.secret_access_key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX"
}

Save this as s3_sink_confluent.json, then create the connector:

avn service connector create demo-kafka @s3_sink_confluent.json

After creation, confirm that data from the students topic appears in the my-test-bucket S3 bucket.

Related pages

Amazon S3 sink connector by Aiven