Skip to main content

Create an Amazon S3 source connector for Aiven for Apache Kafka®

The Amazon S3 source connector allows you to ingest data from S3 buckets into Apache Kafka® topics for real-time processing and analytics.

Prerequisites

  • An Aiven for Apache Kafka® service with Aiven for Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service.

  • An Amazon S3 bucket containing the data to stream into Aiven for Apache Kafka.

  • Required S3 bucket details:

    • AWS_S3_BUCKET_NAME: The bucket name.
    • AWS_S3_REGION: The AWS region where the bucket is located. For example, us-east-1.
    • AWS_S3_PREFIX: Optional. A prefix path if the data is in a specific folder.
    • AWS_ACCESS_KEY_ID: The AWS access key ID.
    • AWS_SECRET_ACCESS_KEY: The AWS secret access key.
    • TARGET_KAFKA_TOPIC: The Apache Kafka topic where the data is published.
  • AWS IAM credentials with the following permissions:

    • s3:GetObject
    • s3:ListBucket

For additional details on how the connector works, see the S3 source connector documentation.

S3 object key name format

The file.name.template setting defines how the connector extracts metadata from S3 object keys. This setting is required. If it is not set, the connector does not process any objects.

The configuration depends on how your files are named in S3:

object_hash distribution type

In version 3.4.0 and later, you can match any object key:

  • file.name.template=".*" to process all object keys, or
  • A regular expression to process only keys that match the pattern

partition distribution type

Set file.name.template using the following placeholders:

  • {{topic}}: The Kafka topic name
  • {{partition}}: The Kafka partition number
  • {{start_offset}}: The offset of the first record in the file
  • {{timestamp}}: Optional. The timestamp when the connector processed the file

Backfill and prefix behavior

The connector uses the Amazon S3 ListObjectsV2 API to list objects.

  • aws.s3.prefix limits processing to keys that begin with the specified prefix
  • The connector processes only objects under that prefix
  • The connector does not automatically continue to the next prefix
  • Objects are processed in the lexicographical order returned by Amazon S3

For time-partitioned folder structures, use a lexicographically sortable pattern such as:

YYYY/MM/DD/HH/
note

Use prefix filtering to backfill a specific time window from S3.

Example templates and extracted values

The following table shows how different templates extract metadata from S3 object keys:

TemplateExample S3 object keyExtracted values
{{topic}}-{{partition}}-{{start_offset}}customer-topic-1-1734445664111.txttopic=customer-topic, partition=1, start_offset=1734445664111
{{topic}}-{{partition}}-{{start_offset}}22-10-12/customer-topic-1-1734445664111.txttopic=22, partition=10, start_offset=112
{{topic}}/{{partition}}/{{start_offset}}customer-topic/1/1734445664111.txttopic=customer-topic, partition=1, start_offset=1734445664111
topic/{{topic}}/partition/{{partition}}/startOffset/{{start_offset}}topic/customer-topic/partition/1/startOffset/1734445664111.txttopic=customer-topic, partition=1, start_offset=1734445664111

Supported S3 object formats

The connector supports four S3 object formats. Choose the one that best fits your data and processing needs:

FormatDescriptionConfigurationExample
JSON Lines (jsonl)Each line is a valid JSON object, commonly used for event streaming.input.format=jsonl{ "key": "k1", "value": "v0", "offset": 1232155, "timestamp": "2020-01-01T00:00:01Z" }
Avro (avro)A compact, schema-based binary format for efficient serialization.input.format=avro{ "type": "record", "fields": [ { "name": "key", "type": "string" }, { "name": "value", "type": "string" }, { "name": "timestamp", "type": "long" } ] }
Parquet (parquet)A columnar format optimized for analytics.input.format=parquetUses a schema similar to Avro but optimized for analytics.
Bytes (bytes) (default)A raw byte stream for unstructured data.input.format=bytesNo predefined structure

Acknowledged records and offset tracking

When a record is acknowledged, Apache Kafka confirms receipt but may not immediately write it to the offset topic. If the connector restarts before Apache Kafka updates the offset topic, some records may be duplicated. For details on offset tracking and retry handling, see the S3 source connector documentation.

Create an Amazon S3 source connector configuration file

Create a file named s3_source_connector.json and add the following configuration:

{
"name": "aiven-s3-source-connector",
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"aws.access.key.id": "YOUR_AWS_ACCESS_KEY_ID",
"aws.secret.access.key": "YOUR_AWS_SECRET_ACCESS_KEY",
"aws.s3.bucket.name": "your-s3-bucket-name",
"aws.s3.region": "your-s3-region",
"aws.s3.prefix": "optional/prefix/",
"aws.credentials.provider": "software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain",
"topic": "your-target-kafka-topic",
"file.name.template": "{{topic}}-{{partition}}-{{start_offset}}",
"tasks.max": 1,
"poll.interval.ms": 10000,
"error.tolerance": "all",
"input.format": "jsonl",
"file.compression.type": "none",
"timestamp.timezone": "UTC",
"timestamp.source": "wallclock"
}

Parameters:

  • name: A unique name for the connector.
  • connector.class: The Java class for the connector.
  • aws.access.key.id and aws.secret.access.key: AWS IAM credentials for authentication.
  • aws.s3.bucket.name: The name of the S3 bucket containing the source data.
  • aws.s3.region: The AWS region where the bucket is located.
  • aws.s3.prefix optional: Limits processing to keys that begin with this prefix. Only objects under this prefix are processed. The connector does not move to the next prefix.
  • aws.credentials.provider: Specifies the AWS credentials provider.
  • topic optional: The connector publishes ingested data to the specified Apache Kafka topic. If not set, it derives the topic from the file.name.template configuration.
  • file.name.template: Defines how to parse S3 object keys to extract the topic, partition, and starting offset. For example, {{topic}}-{{partition}}-{{start_offset}} matches a file name like test-topic-1-1734445664111.txt. For the object_hash distribution type (version 3.4.0+), set file.name.template to .* to match all keys or use a regular expression to match specific keys.
  • tasks.max: The maximum number of tasks that run in parallel.
  • poll.interval.ms: The polling interval (in milliseconds) for checking the S3 bucket for new files.
  • error.tolerance: Specifies the error handling mode.
    • all: Logs and ignores errors.
    • none: Fails on errors.
  • input.format: Specifies the S3 object format. Supported values:
    • jsonl (JSON Lines)
    • avro (Avro)
    • parquet (Parquet)
    • bytes (default)
  • timestamp.timezone optional: Time zone for timestamps. Default is UTC.
  • timestamp.source optional: Source of timestamps. Supports wallclock. Default is wallclock.
  • file.compression.type optional: Compression type for input files. Supported values: gzip, snappy, zstd, none. Default is none.

Create the connector

  1. Access the Aiven Console.

  2. Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.

  3. Click Connectors.

  4. Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.

    Alternatively, to enable connectors:

    1. Click Service settings in the sidebar.
    2. In the Service management section, click Actions > Enable Kafka connect.
  5. In the source connectors list, select Amazon S3 source connector, and click Get started.

  6. On the Amazon S3 Source Connector page, go to the Common tab.

  7. Locate the Connector configuration text box and click Edit.

  8. Paste the configuration from your s3_source_connector.json file into the text box.

  9. Click Create connector.

  10. Verify the connector status on the Connectors page.

Example: Define and create an Amazon S3 source connector

This example shows how to create an Amazon S3 source connector with the following properties:

  • Connector name: aiven-s3-source-connector
  • Apache Kafka topic: test-topic
  • AWS region: us-west-1
  • AWS S3 bucket: my-s3-bucket
  • File name template: {{topic}}-{{partition}}-{{start_offset}}
  • Poll interval: 10 seconds
{
"name": "aiven-s3-source-connector",
"connector.class": "io.aiven.kafka.connect.s3.source.S3SourceConnector",
"tasks.max": 1,
"topic": "test-topic",
"aws.access.key.id": "your-access-key-id",
"aws.secret.access.key": "your-secret-access-key",
"aws.s3.bucket.name": "my-s3-bucket",
"aws.s3.region": "us-west-1",
"aws.s3.prefix": "data-uploads/",
"file.name.template": "{{topic}}-{{partition}}-{{start_offset}}",
"poll.interval.ms": 10000,
"file.compression.type": "none",
"timestamp.timezone": "UTC",
"timestamp.source": "wallclock"
}

Once this configuration is saved in the s3_source_connector.json file, you can create the connector using the Aiven Console or CLI, and verify that data from the Apache Kafka topic test-topic is successfully ingested from the Amazon S3 bucket.

Related pages