Create an Amazon S3 source connector for Aiven for Apache Kafka®
The Amazon S3 source connector allows you to ingest data from S3 buckets into Apache Kafka® topics for real-time processing and analytics.
Prerequisites
-
An Aiven for Apache Kafka® service with Aiven for Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service.
-
An Amazon S3 bucket containing the data to stream into Aiven for Apache Kafka.
-
Required S3 bucket details:
AWS_S3_BUCKET_NAME: The bucket name.AWS_S3_REGION: The AWS region where the bucket is located. For example, us-east-1.AWS_S3_PREFIX: Optional. A prefix path if the data is in a specific folder.AWS_ACCESS_KEY_ID: The AWS access key ID.AWS_SECRET_ACCESS_KEY: The AWS secret access key.TARGET_KAFKA_TOPIC: The Apache Kafka topic where the data is published.
-
AWS IAM credentials with the following permissions:
s3:GetObjects3:ListBucket
For additional details on how the connector works, see the S3 source connector documentation.
S3 object key name format
The file.name.template setting defines how the connector extracts metadata from S3
object keys. This setting is required. If it is not set, the connector does not process
any objects.
The configuration depends on how your files are named in S3:
object_hash distribution type
In version 3.4.0 and later, you can match any object key:
file.name.template=".*"to process all object keys, or- A regular expression to process only keys that match the pattern
partition distribution type
Set file.name.template using the following placeholders:
{{topic}}: The Kafka topic name{{partition}}: The Kafka partition number{{start_offset}}: The offset of the first record in the file{{timestamp}}: Optional. The timestamp when the connector processed the file
Backfill and prefix behavior
The connector uses the Amazon S3 ListObjectsV2 API to list objects.
aws.s3.prefixlimits processing to keys that begin with the specified prefix- The connector processes only objects under that prefix
- The connector does not automatically continue to the next prefix
- Objects are processed in the lexicographical order returned by Amazon S3
For time-partitioned folder structures, use a lexicographically sortable pattern such as:
YYYY/MM/DD/HH/
Use prefix filtering to backfill a specific time window from S3.
Example templates and extracted values
The following table shows how different templates extract metadata from S3 object keys:
| Template | Example S3 object key | Extracted values |
|---|---|---|
{{topic}}-{{partition}}-{{start_offset}} | customer-topic-1-1734445664111.txt | topic=customer-topic, partition=1, start_offset=1734445664111 |
{{topic}}-{{partition}}-{{start_offset}} | 22-10-12/customer-topic-1-1734445664111.txt | topic=22, partition=10, start_offset=112 |
{{topic}}/{{partition}}/{{start_offset}} | customer-topic/1/1734445664111.txt | topic=customer-topic, partition=1, start_offset=1734445664111 |
topic/{{topic}}/partition/{{partition}}/startOffset/{{start_offset}} | topic/customer-topic/partition/1/startOffset/1734445664111.txt | topic=customer-topic, partition=1, start_offset=1734445664111 |
Supported S3 object formats
The connector supports four S3 object formats. Choose the one that best fits your data and processing needs:
| Format | Description | Configuration | Example |
|---|---|---|---|
JSON Lines (jsonl) | Each line is a valid JSON object, commonly used for event streaming. | input.format=jsonl | { "key": "k1", "value": "v0", "offset": 1232155, "timestamp": "2020-01-01T00:00:01Z" } |
Avro (avro) | A compact, schema-based binary format for efficient serialization. | input.format=avro | { "type": "record", "fields": [ { "name": "key", "type": "string" }, { "name": "value", "type": "string" }, { "name": "timestamp", "type": "long" } ] } |
Parquet (parquet) | A columnar format optimized for analytics. | input.format=parquet | Uses a schema similar to Avro but optimized for analytics. |
Bytes (bytes) (default) | A raw byte stream for unstructured data. | input.format=bytes | No predefined structure |
Acknowledged records and offset tracking
When a record is acknowledged, Apache Kafka confirms receipt but may not immediately write it to the offset topic. If the connector restarts before Apache Kafka updates the offset topic, some records may be duplicated. For details on offset tracking and retry handling, see the S3 source connector documentation.
Create an Amazon S3 source connector configuration file
Create a file named s3_source_connector.json and add the following configuration:
{
"name": "aiven-s3-source-connector",
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"aws.access.key.id": "YOUR_AWS_ACCESS_KEY_ID",
"aws.secret.access.key": "YOUR_AWS_SECRET_ACCESS_KEY",
"aws.s3.bucket.name": "your-s3-bucket-name",
"aws.s3.region": "your-s3-region",
"aws.s3.prefix": "optional/prefix/",
"aws.credentials.provider": "software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain",
"topic": "your-target-kafka-topic",
"file.name.template": "{{topic}}-{{partition}}-{{start_offset}}",
"tasks.max": 1,
"poll.interval.ms": 10000,
"error.tolerance": "all",
"input.format": "jsonl",
"file.compression.type": "none",
"timestamp.timezone": "UTC",
"timestamp.source": "wallclock"
}
Parameters:
name: A unique name for the connector.connector.class: The Java class for the connector.aws.access.key.idandaws.secret.access.key: AWS IAM credentials for authentication.aws.s3.bucket.name: The name of the S3 bucket containing the source data.aws.s3.region: The AWS region where the bucket is located.aws.s3.prefixoptional: Limits processing to keys that begin with this prefix. Only objects under this prefix are processed. The connector does not move to the next prefix.aws.credentials.provider: Specifies the AWS credentials provider.topicoptional: The connector publishes ingested data to the specified Apache Kafka topic. If not set, it derives the topic from thefile.name.templateconfiguration.file.name.template: Defines how to parse S3 object keys to extract the topic, partition, and starting offset. For example,{{topic}}-{{partition}}-{{start_offset}}matches a file name liketest-topic-1-1734445664111.txt. For theobject_hashdistribution type (version 3.4.0+), setfile.name.templateto.*to match all keys or use a regular expression to match specific keys.tasks.max: The maximum number of tasks that run in parallel.poll.interval.ms: The polling interval (in milliseconds) for checking the S3 bucket for new files.error.tolerance: Specifies the error handling mode.all: Logs and ignores errors.none: Fails on errors.
input.format: Specifies the S3 object format. Supported values:jsonl(JSON Lines)avro(Avro)parquet(Parquet)bytes(default)
timestamp.timezoneoptional: Time zone for timestamps. Default isUTC.timestamp.sourceoptional: Source of timestamps. Supportswallclock. Default iswallclock.file.compression.typeoptional: Compression type for input files. Supported values:gzip,snappy,zstd,none. Default isnone.
Create the connector
- Aiven Console
- Aiven CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
-
Click Connectors.
-
Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
- Click Service settings in the sidebar.
- In the Service management section, click Actions > Enable Kafka connect.
-
In the source connectors list, select Amazon S3 source connector, and click Get started.
-
On the Amazon S3 Source Connector page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
s3_source_connector.jsonfile into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
To create the S3 source connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @s3_source_connector.json
Parameters:
SERVICE_NAME: Name of your Aiven for Apache Kafka service.@s3_source_connector.json: Path to your JSON configuration file.
To check the connector status, run:
avn service connector status SERVICE_NAME CONNECTOR_NAME
Verify that data flows to the target Apache Kafka topic.
Example: Define and create an Amazon S3 source connector
This example shows how to create an Amazon S3 source connector with the following properties:
- Connector name:
aiven-s3-source-connector - Apache Kafka topic:
test-topic - AWS region:
us-west-1 - AWS S3 bucket:
my-s3-bucket - File name template:
{{topic}}-{{partition}}-{{start_offset}} - Poll interval:
10 seconds
{
"name": "aiven-s3-source-connector",
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"tasks.max": 1,
"topic": "test-topic",
"aws.access.key.id": "your-access-key-id",
"aws.secret.access.key": "your-secret-access-key",
"aws.s3.bucket.name": "my-s3-bucket",
"aws.s3.region": "us-west-1",
"aws.s3.prefix": "data-uploads/",
"file.name.template": "{{topic}}-{{partition}}-{{start_offset}}",
"poll.interval.ms": 10000,
"file.compression.type": "none",
"timestamp.timezone": "UTC",
"timestamp.source": "wallclock"
}
Once this configuration is saved in the s3_source_connector.json file, you can
create the connector using the Aiven Console or CLI, and verify that data from the
Apache Kafka topic test-topic is successfully ingested from the Amazon S3 bucket.
Related pages