Configure the Iceberg sink connector with Snowflake Open Catalog
Snowflake Open Catalog is a managed Apache Polaris™ service that supports the Apache Iceberg™ REST catalog API and uses Amazon S3 for storage.
Use the Iceberg sink connector in Aiven for Apache Kafka® Connect to write data to Iceberg tables managed by Snowflake Open Catalog.
Prerequisites
- A Snowflake Open Catalog account
- A Polaris catalog created using Amazon S3 as the storage backend
- An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service
- An Apache Kafka topic that contains data to sink
- An Apache Kafka control topic (default:
control-iceberg
) - AWS S3 credentials and a configured S3 bucket
- Polaris authentication credentials
iceberg.catalog.scope
: A role scope defined in the Open Catalogiceberg.catalog.credential
: A token or hash for authenticating with the catalog
Snowflake Open Catalog does not support automatic catalog creation. Create the catalog manually in the Snowflake console.
Create an Iceberg sink connector configuration
To configure the Iceberg sink connector, define a JSON configuration with the required catalog, S3, and Kafka settings.
Loading worker properties is not supported. Use iceberg.kafka.*
properties instead.
{
"name": "CONNECTOR_NAME",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "KAFKA_TOPICS",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"consumer.override.auto.offset.reset": "earliest",
"iceberg.kafka.auto.offset.reset": "earliest",
"iceberg.kafka.bootstrap.servers": "KAFKA_HOST:KAFKA_PORT",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.key.password": "KEY_PASSWORD",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "KEYSTORE_PASSWORD",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "TRUSTSTORE_PASSWORD",
"iceberg.tables": "DATABASE.TABLE",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"iceberg.control.topic": "CONTROL_TOPIC",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://SNOWFLAKE_ACCOUNT_ID.AWS_REGION.snowflakecomputing.com/polaris/api/catalog",
"iceberg.catalog.scope": "PRINCIPAL_ROLE:ROLE_NAME",
"iceberg.catalog.credential": "CATALOG_CREDENTIAL",
"iceberg.catalog.warehouse": "CATALOG_NAME",
"iceberg.catalog.client.region": "AWS_REGION",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "AWS_ACCESS_KEY_ID",
"iceberg.catalog.s3.secret-access-key": "AWS_SECRET_ACCESS_KEY",
"iceberg.catalog.s3.path-style-access": "true"
}
Parameters:
name
: Name of the connectorconnector.class
: Set toorg.apache.iceberg.connect.IcebergSinkConnector
tasks.max
: Maximum number of tasks the connector can run in paralleltopics
: Apache Kafka topics to read data fromkey.converter
: Useorg.apache.kafka.connect.json.JsonConverter
value.converter
: Useorg.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable
: Enable (true
) or disable (false
) schema support in the key convertervalue.converter.schemas.enable
: Enable (true
) or disable (false
) schema support in the value converterconsumer.override.auto.offset.reset
: Kafka consumer offset reset policy (recommended:earliest
)iceberg.kafka.auto.offset.reset
: Offset reset policy for the Iceberg internal Apache Kafka consumericeberg.kafka.bootstrap.servers
: Apache Kafka broker address inKAFKA_HOST:KAFKA_PORT
formaticeberg.kafka.security.protocol
: UseSSL
for secure communicationiceberg.kafka.ssl.keystore.location
: File path to the SSL keystoreiceberg.kafka.ssl.keystore.password
: Keystore passwordiceberg.kafka.ssl.keystore.type
: UsePKCS12
iceberg.kafka.ssl.truststore.location
: File path to the truststoreiceberg.kafka.ssl.truststore.password
: Truststore passwordiceberg.kafka.ssl.key.password
: Password for the SSL private keyiceberg.tables
: Iceberg table name inDATABASE.TABLE
formaticeberg.tables.auto-create-enabled
: Enable (true
) or disable (false
) automatic table creationiceberg.control.commit.interval-ms
: Frequency (in ms) to commit data to Icebergiceberg.control.commit.timeout-ms
: Max time (in ms) to wait for a commiticeberg.control.topic
: Control topic used by Iceberg (default:control-iceberg
)iceberg.catalog.type
: Set torest
for Snowflake Open Catalogiceberg.catalog.uri
: Polaris REST catalog endpoint (from the Snowflake Open Catalog console)iceberg.catalog.scope
: Role scope in the formatPRINCIPAL_ROLE:ROLE_NAME
iceberg.catalog.credential
: Authentication credential for Snowflake Open Catalog Use the formatCLIENT_ID:CLIENT_SECRET
from the configured service connection that uses a Principal roleiceberg.catalog.warehouse
: Name of the catalog created in Snowflake Open Catalog. This is not the S3 bucket nameiceberg.catalog.client.region
: AWS region of the S3 bucketiceberg.catalog.io-impl
: Set toorg.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.s3.access-key-id
: AWS access key ID with write permissionsiceberg.catalog.s3.secret-access-key
: AWS secret access keyiceberg.catalog.s3.path-style-access
: Enable (true
) or disable (false
) path-style access
Create the Iceberg sink connector
- Aiven Console
- Aiven CLI
- Access the Aiven Console.
- Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
- Click Connectors.
- Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
-
Click Service settings in the sidebar.
-
In the Service management section, click Actions > Enable Kafka connect.
-
In the sink connectors list, select Iceberg Sink Connector, and click Get started.
-
On the Iceberg Sink Connector page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
iceberg_sink_connector.json
file into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
To create the Iceberg sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @iceberg_sink_connector.json
Parameters:
SERVICE_NAME
: Name of your Aiven for Apache Kafka® service.@iceberg_sink_connector.json
: Path to the JSON configuration file.
Example
This example shows a complete connector configuration that uses Snowflake Open Catalog (Polaris) as the catalog and Amazon S3 for storage.
{
"name": "iceberg_sink_polaris",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "test-topic",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://1234567890.us-east-1.snowflakecomputing.com/polaris/api/catalog",
"iceberg.catalog.scope": "PRINCIPAL_ROLE:my-role",
"iceberg.catalog.credential": "my-token",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "your-access-key-id",
"iceberg.catalog.s3.secret-access-key": "your-secret-access-key",
"iceberg.catalog.warehouse": "my-bucket",
"iceberg.tables": "mydatabase.mytable",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"iceberg.kafka.bootstrap.servers": "kafka.example.com:9092",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "password",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "password",
"iceberg.kafka.ssl.key.password": "password"
}
Related pages