Create an Amazon S3 sink connector by Confluent from Apache Kafka®
The Amazon S3 sink connector by Confluent moves data from Apache Kafka® topics to Amazon S3 buckets for long-term storage.
For the full list of configuration options, see the Confluent S3 sink connector documentation.
Prerequisites
-
An Aiven for Apache Kafka® service with Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service.
-
AWS access to an S3 bucket. See Prepare AWS for S3 sink to complete bucket and IAM setup. After setup, you will have the required S3 bucket details and access credentials.
Create the connector configuration file
Create a file named s3_sink_confluent.json with the following configuration:
{
"name": "s3_sink_confluent",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "1",
"s3.bucket.name": "my-bucket",
"s3.region": "eu-central-1",
"s3.credentials.provider.class": "io.aiven.kafka.connect.util.AivenAWSCredentialsProvider",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.access_key_id": "<AWS_USER_ACCESS_KEY_ID>",
"s3.credentials.provider.secret_access_key": "<AWS_USER_SECRET_ACCESS_KEY>"
}
Parameters:
connector.class: Uses the Confluent S3 sink connectortopics: Kafka topics to sinkformat.class: Output format (byte array in this example)flush.size: Number of messages written per file in S3s3.bucket.name/s3.region: Destination S3 buckets3.credentials.provider.class: Use Aiven provider to supply AWS credentials
For all options, see the Confluent connector documentation.
Create the connector
- Aiven Console
- Aiven CLI
- Open the Aiven Console.
- Select your Kafka or Kafka Connect service.
- Click Connectors.
- Click Create connector.
- Search for Amazon S3 sink (Confluent) and click Get started.
- On the connector page, go to the Common tab.
- Click Edit in Connector configuration.
- Paste your
s3_sink_confluent.jsoncontents. - Click Create connector.
- Verify the connector status on the Connectors page.
Create the connector:
avn service connector create SERVICE_NAME @s3_sink_confluent.json
Check the connector status:
avn service connector status SERVICE_NAME s3_sink_confluent
Example: create an Amazon S3 sink connector (Confluent)
This example creates an S3 sink connector that writes data from the students topic to
the my-test-bucket bucket in the eu-central-1 region, writing a file every 10 messages.
{
"name": "my_s3_sink",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "students",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"flush.size": "10",
"s3.bucket.name": "my-test-bucket",
"s3.region": "eu-central-1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.credentials.provider.access_key_id": "AKIAXXXXXXXXXX",
"s3.credentials.provider.secret_access_key": "hELuXXXXXXXXXXXXXXXXXXXXXXXXXX"
}
Save this as s3_sink_confluent.json, then create the connector:
avn service connector create demo-kafka @s3_sink_confluent.json
After creation, confirm that data from the students topic appears in
the my-test-bucket S3 bucket.
Related pages