Skip to main content

Create an Amazon S3 sink connector for Aiven from Apache Kafka®

The Amazon S3 sink connector sends data from Aiven for Apache Kafka® to Amazon S3 for long-term storage.

View the full configuration reference in the S3 sink connector documentation.

Prerequisites

To use AWS IAM assume role credentials, see Use AWS IAM assume role credentials provider.

Create an S3 sink connector configuration file

Create a file named s3_sink_connector.json with the following configuration:

{
"name": "s3_sink",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"aws.access.key.id": "AKIAXXXXXXXXXX",
"aws.secret.access.key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"aws.s3.bucket.name": "my-bucket",
"aws.s3.region": "eu-central-1",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}

Parameters:

  • name: Connector name

  • topics: Apache Kafka topics to sink data from

  • aws.access.key.id: AWS access key ID

  • aws.secret.access.key: AWS secret access key

  • aws.s3.bucket.name: S3 bucket name

  • aws.s3.region: AWS region

  • key.converter, value.converter: Converters for key and value formats

  • file.compression.type optional: Compression type for output files. Supported values: gzip, snappy, zstd, none. Default is gzip.

    If you use the Amazon S3 source connector, set the same file.compression.type in the source configuration to ensure the source can read data written by the sink. The S3 source connector defaults to none.

For additional parameters (for example, file naming or output format), see S3 sink connector reference.

Create the connector

  1. Open the Aiven Console.
  2. Select your Aiven for Apache Kafka® or Apache Kafka Connect® service.
  3. Click Connectors.
  4. Click Create connector. If connectors are not enabled, click Enable connectors on this service.
  5. Search for Amazon S3 sink and click Get started.
  6. Go to the Common tab and click Edit in the Connector configuration section.
  7. Paste your s3_sink_connector.json content.
  8. Click Create connector.
  9. Confirm the connector is running in the Connectors view.

Example: create an Amazon S3 sink connector

This example creates an S3 sink connector that writes data from the students topic to the my-test-bucket bucket in the eu-central-1 region.

{
"name": "my_s3_sink",
"connector.class": "io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector",
"tasks.max": "1",
"topics": "students",
"aws.access.key.id": "AKIAXXXXXXXXXX",
"aws.secret.access.key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX",
"aws.s3.bucket.name": "my-test-bucket",
"aws.s3.region": "eu-central-1",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}

Save this as s3_sink_connector.json, then create the connector:

avn service connector create demo-kafka @s3_sink_connector.json

After creation, confirm that data from the students topic appears in the my-test-bucket S3 bucket.

The S3 sink connector compresses files using gzip by default. To read these files using the S3 source connector, set file.compression.type in the source configuration to gzip or another matching value.

Related pages

Amazon S3 sink connector by Confluent