Skip to main content

Configure the Iceberg sink connector with Snowflake Open Catalog

Snowflake Open Catalog is a managed Apache Polaris™ service that supports the Apache Iceberg™ REST catalog API and uses Amazon S3 for storage.

Use the Iceberg sink connector in Aiven for Apache Kafka® Connect to write data to Iceberg tables managed by Snowflake Open Catalog.

Prerequisites

note

Snowflake Open Catalog does not support automatic catalog creation. Create the catalog manually in the Snowflake console.

Create an Iceberg sink connector configuration

To configure the Iceberg sink connector, define a JSON configuration with the required catalog, S3, and Kafka settings.

note

Loading worker properties is not supported. Use iceberg.kafka.* properties instead.

{

"name": "CONNECTOR_NAME",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "KAFKA_TOPICS",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"consumer.override.auto.offset.reset": "earliest",
"iceberg.kafka.auto.offset.reset": "earliest",
"iceberg.kafka.bootstrap.servers": "KAFKA_HOST:KAFKA_PORT",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.key.password": "KEY_PASSWORD",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "KEYSTORE_PASSWORD",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "TRUSTSTORE_PASSWORD",
"iceberg.tables": "DATABASE.TABLE",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"iceberg.control.topic": "CONTROL_TOPIC",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://SNOWFLAKE_ACCOUNT_ID.AWS_REGION.snowflakecomputing.com/polaris/api/catalog",
"iceberg.catalog.scope": "PRINCIPAL_ROLE:ROLE_NAME",
"iceberg.catalog.credential": "CATALOG_CREDENTIAL",
"iceberg.catalog.warehouse": "CATALOG_NAME",
"iceberg.catalog.client.region": "AWS_REGION",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "AWS_ACCESS_KEY_ID",
"iceberg.catalog.s3.secret-access-key": "AWS_SECRET_ACCESS_KEY",
"iceberg.catalog.s3.path-style-access": "true"
}

Parameters:

  • name: Name of the connector
  • connector.class: Set to org.apache.iceberg.connect.IcebergSinkConnector
  • tasks.max: Maximum number of tasks the connector can run in parallel
  • topics: Apache Kafka topics to read data from
  • key.converter: Use org.apache.kafka.connect.json.JsonConverter
  • value.converter: Use org.apache.kafka.connect.json.JsonConverter
  • key.converter.schemas.enable: Enable (true) or disable (false) schema support in the key converter
  • value.converter.schemas.enable: Enable (true) or disable (false) schema support in the value converter
  • consumer.override.auto.offset.reset: Kafka consumer offset reset policy (recommended: earliest)
  • iceberg.kafka.auto.offset.reset: Offset reset policy for the Iceberg internal Apache Kafka consumer
  • iceberg.kafka.bootstrap.servers: Apache Kafka broker address in KAFKA_HOST:KAFKA_PORT format
  • iceberg.kafka.security.protocol: Use SSL for secure communication
  • iceberg.kafka.ssl.keystore.location: File path to the SSL keystore
  • iceberg.kafka.ssl.keystore.password: Keystore password
  • iceberg.kafka.ssl.keystore.type: Use PKCS12
  • iceberg.kafka.ssl.truststore.location: File path to the truststore
  • iceberg.kafka.ssl.truststore.password: Truststore password
  • iceberg.kafka.ssl.key.password: Password for the SSL private key
  • iceberg.tables: Iceberg table name in DATABASE.TABLE format
  • iceberg.tables.auto-create-enabled: Enable (true) or disable (false) automatic table creation
  • iceberg.control.commit.interval-ms: Frequency (in ms) to commit data to Iceberg
  • iceberg.control.commit.timeout-ms: Max time (in ms) to wait for a commit
  • iceberg.control.topic: Control topic used by Iceberg (default: control-iceberg)
  • iceberg.catalog.type: Set to rest for Snowflake Open Catalog
  • iceberg.catalog.uri: Polaris REST catalog endpoint (from the Snowflake Open Catalog console)
  • iceberg.catalog.scope: Role scope in the format PRINCIPAL_ROLE:ROLE_NAME
  • iceberg.catalog.credential: Authentication credential for Snowflake Open Catalog Use the format CLIENT_ID:CLIENT_SECRET from the configured service connection that uses a Principal role
  • iceberg.catalog.warehouse: Name of the catalog created in Snowflake Open Catalog. This is not the S3 bucket name
  • iceberg.catalog.client.region: AWS region of the S3 bucket
  • iceberg.catalog.io-impl: Set to org.apache.iceberg.aws.s3.S3FileIO
  • iceberg.catalog.s3.access-key-id: AWS access key ID with write permissions
  • iceberg.catalog.s3.secret-access-key: AWS secret access key
  • iceberg.catalog.s3.path-style-access: Enable (true) or disable (false) path-style access

Create the Iceberg sink connector

  1. Access the Aiven Console.
  2. Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
  3. Click Connectors.
  4. Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.

Alternatively, to enable connectors:

  1. Click Service settings in the sidebar.

  2. In the Service management section, click Actions > Enable Kafka connect.

  3. In the sink connectors list, select Iceberg Sink Connector, and click Get started.

  4. On the Iceberg Sink Connector page, go to the Common tab.

  5. Locate the Connector configuration text box and click Edit.

  6. Paste the configuration from your iceberg_sink_connector.json file into the text box.

  7. Click Create connector.

  8. Verify the connector status on the Connectors page.

Example

This example shows a complete connector configuration that uses Snowflake Open Catalog (Polaris) as the catalog and Amazon S3 for storage.

{
"name": "iceberg_sink_polaris",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "test-topic",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://1234567890.us-east-1.snowflakecomputing.com/polaris/api/catalog",
"iceberg.catalog.scope": "PRINCIPAL_ROLE:my-role",
"iceberg.catalog.credential": "my-token",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "your-access-key-id",
"iceberg.catalog.s3.secret-access-key": "your-secret-access-key",
"iceberg.catalog.warehouse": "my-bucket",
"iceberg.tables": "mydatabase.mytable",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"iceberg.kafka.bootstrap.servers": "kafka.example.com:9092",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "password",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "password",
"iceberg.kafka.ssl.key.password": "password"
}

Related pages