Configure the Iceberg sink connector with AWS Glue REST catalog
The AWS Glue REST catalog stores metadata using the Iceberg REST API. It integrates Apache Kafka with AWS Glue using REST-based communication.
Prerequisites
- An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service.
- Apache Kafka client settings: The Iceberg sink connector requires these settings to connect to the Iceberg control topic. For a full list of supported configurations, see Iceberg configuration.
- AWS-specific setup:
- Create an S3 bucket to store data.
- Configure AWS IAM roles with the appropriate permissions. See Configure AWS IAM permissions.
- Create an AWS Glue database and tables:
- If you use the AWS Glue REST catalog, manually create tables. Follow the naming conventions, select Apache Iceberg table as the type, make sure the table definition matches the schema of Apache Kafka record schema.
- Specify the S3 bucket as the storage location. For more details, see the AWS Glue data catalog documentation.
Configure AWS IAM permissions
The Iceberg sink connector requires an IAM user with permissions to access Amazon S3 and AWS Glue. These permissions allow the connector to write data to an S3 bucket and manage metadata in the AWS Glue catalog.
To set up the required permissions:
-
Create an IAM user in AWS Identity and Access Management (IAM) with permissions for Amazon S3 and AWS Glue.
-
Attach the following policy to the IAM user:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Access",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:AbortMultipartUpload",
"s3:ListMultipartUploadParts"
],
"Resource": [
"arn:aws:s3:::<your-bucket-name>/*"
]
},
{
"Sid": "S3ListBucket",
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": [
"arn:aws:s3:::<your-bucket-name>"
]
},
{
"Sid": "GlueAccess",
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:GetDatabase",
"glue:GetTables",
"glue:SearchTables",
"glue:CreateTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions"
],
"Resource": [
"arn:aws:glue:<your-aws-region>:<your-aws-account>:catalog",
"arn:aws:glue:<your-aws-region>:<your-aws-account>:database/*",
"arn:aws:glue:<your-aws-region>:<your-aws-account>:table/*"
]
}
]
}Replace the placeholder values in the policy:
<your-aws-region>
: Your AWS Glue catalog’s region<your-aws-account>
: Your AWS account ID<your-bucket-name>
: The name of your Amazon S3 bucket
-
Obtain the access key ID and secret access key for the IAM user.
-
Add these credentials to the Iceberg sink connector configuration.
For more information on creating and managing AWS IAM users and policies, see the AWS IAM documentation.
AWS Glue naming conventions
When creating databases and tables in AWS Glue for the Iceberg sink connector, follow these naming conventions to ensure compatibility:
-
Database names:
- Use only lowercase letters (a-z), numbers (0-9), and underscores (_).
- Must be between 1 and 252 characters long.
- Examples:
- Valid:
sales_data
,customer_orders_2024
- Invalid:
SalesData
,customer orders
- Valid:
-
Table names:
- Use only lowercase letters, numbers, and underscores.
- Must be between 1 and 255 characters long.
- Examples:
- Valid:
product_catalog
,order_history_2023
- Invalid:
ProductCatalog
,order-history
- Valid:
-
Column names: AWS Glue has minimal restrictions on column names. Using only letters, numbers, and underscores is recommended for best compatibility.
For more details, see the AWS Athena naming conventions.
Create an Iceberg sink connector configuration
To configure the Iceberg sink connector, define a JSON configuration file based on your catalog type.
Loading worker properties is not supported yet. Use iceberg.kafka.*
properties instead.
-
Create AWS resources, including an S3 bucket, Glue database, and tables.
noteThe AWS Glue REST catalog does not support automatic table creation. Manually create tables in AWS Glue and ensure the schema matches the Apache Kafka data.
-
Add the following configurations to the Iceberg sink connector:
{
"name": "CONNECTOR_NAME",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "KAFKA_TOPICS",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"consumer.override.auto.offset.reset": "earliest",
"iceberg.kafka.auto.offset.reset": "earliest",
"iceberg.tables": "DATABASE_NAME.TABLE_NAME",
"iceberg.tables.auto-create-enabled": "false",
"iceberg.control.topic": "ICEBERG_CONTROL_TOPIC_NAME",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "60000",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://glue.AWS_REGION.amazonaws.com/iceberg",
"iceberg.catalog.warehouse": "AWS_ACCOUNT_ID",
"iceberg.catalog.client.region": "AWS_REGION",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.rest.signing-name": "glue",
"iceberg.catalog.rest.signing-region": "AWS_REGION",
"iceberg.catalog.rest.sigv4-enabled": "true",
"iceberg.catalog.rest.access-key-id": "AWS_ACCESS_KEY_ID",
"iceberg.catalog.rest.secret-access-key": "AWS_SECRET_ACCESS_KEY",
"iceberg.catalog.rest-metrics-reporting-enabled": "false",
"iceberg.catalog.s3.access-key-id": "AWS_ACCESS_KEY_ID",
"iceberg.catalog.s3.secret-access-key": "AWS_SECRET_ACCESS_KEY",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.kafka.bootstrap.servers": "KAFKA_HOST:KAFKA_PORT",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "KEYSTORE_PASSWORD",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "TRUSTSTORE_PASSWORD",
"iceberg.kafka.ssl.key.password": "KEY_PASSWORD"
}Parameters
name
: Specify the connector nameconnector.class
: Defines the connector class Useorg.apache.iceberg.connect.IcebergSinkConnector
tasks.max
: Define the maximum number of tasks the connector can runtopics
: List the Apache Kafka topics containing data for Iceberg tableskey.converter
: Set the key converter class. Useorg.apache.kafka.connect.json.JsonConverter
for JSON datavalue.converter
: Set the value converter class. Useorg.apache.kafka.connect.json.JsonConverter
for JSON datakey.converter.schemas.enable
: Enable (true
) or disable (false
) schema support for the key convertervalue.converter.schemas.enable
: Enable (true
) or disable (false
) schema support for the value converterconsumer.override.auto.offset.reset
: Set the Kafka consumer offset reset policy Options:earliest
(consume from the beginning) orlatest
(consume new messages)iceberg.kafka.auto.offset.reset
: Set the offset reset policy for Iceberg’s Apache Kafka consumericeberg.tables
: Define the target Iceberg table inDATABASE_NAME.TABLE_NAME
formaticeberg.tables.auto-create-enabled
: Enable (true
) or disable (false
) automatic table creationiceberg.control.topic
: Set the Kafka topic for Iceberg control operations. Defaults tocontrol-iceberg
if not seticeberg.control.commit.interval-ms
: Define how often (in milliseconds) the connector commits data to Iceberg tables. Default:1000
(1 second)iceberg.control.commit.timeout-ms
: Set the maximum wait time (in milliseconds) for a commit before timing out. Default:30000
(30 seconds)iceberg.catalog.type
: Specify the Iceberg catalog type. Userest
for AWS Glue REST catalogiceberg.catalog.uri
: Set the URI of the Iceberg REST catalogiceberg.catalog.warehouse
: Set the AWS account ID when using the REST catalogiceberg.catalog.client.region
: Set the AWS region for Iceberg catalog operationsiceberg.catalog.io-impl
: Specify the file I/O implementation. Useorg.apache.iceberg.aws.s3.S3FileIO
for AWS S3iceberg.catalog.rest.signing-name
: Specify the AWS service name for signing requests (for example,glue
)iceberg.catalog.rest.signing-region
: Set the AWS region used for request signing.iceberg.catalog.rest.sigv4-enabled
: Enable (true
) or disable (false
) AWS SigV4 authentication for REST requests. Deprecated in version 1.8iceberg.catalog.rest.auth.type
: Sets the authentication method for REST requests tobasic
(HTTP credentials),sigv4
(AWS access key), oroauth2
(OIDC token). Introduced in version 1.8iceberg.catalog.rest.access-key-id
: Set the AWS access key ID for REST catalog authenticationiceberg.catalog.rest.secret-access-key
: Set the AWS secret access key for REST catalog authenticationiceberg.catalog.rest-metrics-reporting-enabled
: Enable (true
) or disable (false
) metrics reporting for the Iceberg REST catalogiceberg.catalog.s3.access-key-id
: Set the AWS access key ID for S3 authenticationiceberg.catalog.s3.secret-access-key
: Set the AWS secret access key for S3 authenticationiceberg.catalog.s3.path-style-access
: Enable (true
) or disable (false
) path-style access for S3 bucketsiceberg.kafka.bootstrap.servers
: Define the Kafka broker connection details inKAFKA_HOST:KAFKA_PORT
formaticeberg.kafka.security.protocol
: Defines the security protocol. UseSSL
for encrypted communicationiceberg.kafka.ssl.keystore.location
: Specify the file path to the keystore containing the SSL certificateiceberg.kafka.ssl.keystore.password
: Set the password for the keystoreiceberg.kafka.ssl.keystore.type
: Set the keystore type (for example,PKCS12
).iceberg.kafka.ssl.truststore.location
: Specify the file path to the truststore containing trusted SSL certificatesiceberg.kafka.ssl.truststore.password
: Set the password for the truststoreiceberg.kafka.ssl.key.password
: Set the password to access the private key stored in the keystore
Apache Kafka security settings are the same for both AWS Glue REST and AWS Glue catalog configurations.
Create the Iceberg sink connector
- Aiven Console
- Aiven CLI
- Access the Aiven Console.
- Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
- Click Connectors.
- Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
-
Click Service settings in the sidebar.
-
In the Service management section, click Actions > Enable Kafka connect.
-
In the sink connectors list, select Iceberg Sink Connector, and click Get started.
-
On the Iceberg Sink Connector page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
iceberg_sink_connector.json
file into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
To create the Iceberg sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @iceberg_sink_connector.json
Parameters:
SERVICE_NAME
: Name of your Aiven for Apache Kafka® service.@iceberg_sink_connector.json
: Path to the JSON configuration file.
Example
This example shows how to create an Iceberg sink connector using AWS Glue as REST Catalog with the following properties:
- Connector name:
iceberg_sink_rest
- Apache Kafka topic:
test-topic
- AWS Account ID:
your-aws-account-id
- AWS Glue region:
us-west-1
- AWS IAM access key ID:
your-access-key-id
- AWS IAM secret access key:
your-secret-access-key
- Target table:
mydatabase.mytable
- Commit interval:
1000 ms
- Tasks:
2
{
"name": "iceberg_sink_rest",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "test-topic",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://glue.us-west-1.amazonaws.com/iceberg",
"iceberg.catalog.rest.signing-name": "glue",
"iceberg.catalog.rest.signing-region": "us-west-1",
"iceberg.catalog.rest.sigv4-enabled": "true",
"iceberg.catalog.rest.access-key-id": "your-access-key-id",
"iceberg.catalog.rest.secret-access-key": "your-secret-access-key",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "your-access-key-id",
"iceberg.catalog.s3.secret-access-key": "your-secret-access-key",
"iceberg.catalog.warehouse": "your-aws-account-id",
"iceberg.tables": "mydatabase.mytable",
"iceberg.tables.auto-create-enabled": "false",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "60000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"iceberg.kafka.bootstrap.servers": "kafka.example.com:9092",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "password",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "password",
"iceberg.kafka.ssl.key.password": "password"
}
Related pages