Skip to main content

Configure the Iceberg sink connector with a PostgreSQL JDBC catalog

The JDBC catalog stores Iceberg table metadata in a PostgreSQL® database. Use it to integrate Apache Kafka®, Amazon S3, and a PostgreSQL-based metadata store.

Prerequisites

Create an Iceberg sink connector configuration

To configure the Iceberg sink connector with the JDBC catalog, define a JSON file that uses PostgreSQL for metadata and Amazon S3 for storage.

note

Loading worker properties is not supported. Use iceberg.kafka.* properties instead.

{
"name": "CONNECTOR_NAME",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "KAFKA_TOPICS",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"consumer.override.auto.offset.reset": "earliest",
"iceberg.kafka.auto.offset.reset": "earliest",
"iceberg.tables": "DATABASE.TABLE",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"iceberg.catalog.type": "jdbc",
"iceberg.catalog.uri": "jdbc:postgresql://HOST:PORT/DATABASE?user=USERNAME&password=PASSWORD&ssl=require",
"iceberg.catalog.warehouse": "s3://BUCKET_NAME",
"iceberg.catalog.client.region": "AWS_REGION",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.jdbc.useSSL": "true",
"iceberg.catalog.jdbc.verifyServerCertificate": "true",
"iceberg.catalog.s3.access-key-id": "AWS_ACCESS_KEY_ID",
"iceberg.catalog.s3.secret-access-key": "AWS_SECRET_ACCESS_KEY",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.kafka.bootstrap.servers": "KAFKA_HOST:PORT",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "KEYSTORE_PASSWORD",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "TRUSTSTORE_PASSWORD",
"iceberg.kafka.ssl.key.password": "KEY_PASSWORD"
}

Parameters:

  • name: Name of the connector
  • connector.class: Connector implementation class
  • topics: Comma-separated list of Apache Kafka topics with source data
  • iceberg.catalog.type: Catalog type. Use jdbc for PostgreSQL
  • iceberg.catalog.uri: JDBC connection string for PostgreSQL
  • iceberg.catalog.warehouse: S3 bucket URI for table storage
  • iceberg.catalog.client.region: AWS region where the S3 bucket is located. Required if no region is set in the environment or system properties
  • iceberg.tables: Target Iceberg table in DATABASE.TABLE format.
  • iceberg.tables.auto-create-enabled: Automatically create tables if they do not exist
  • iceberg.catalog.io-impl: File I/O implementation for S3
  • iceberg.catalog.s3.access-key-id and secret-access-key: AWS credentials for S3
  • iceberg.kafka.*: Apache Kafka security settings

For the full list of parameters, see the Iceberg Kafka Connect configuration.

Create the connector

  1. Access the Aiven Console.
  2. Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
  3. Click Connectors.
  4. Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.

Alternatively, to enable connectors:

  1. Click Service settings in the sidebar.

  2. In the Service management section, click Actions > Enable Kafka connect.

  3. In the sink connectors list, select Iceberg Sink Connector, and click Get started.

  4. On the Iceberg Sink Connector page, go to the Common tab.

  5. Locate the Connector configuration text box and click Edit.

  6. Paste the configuration from your iceberg_sink_connector.json file into the text box.

  7. Click Create connector.

  8. Verify the connector status on the Connectors page.

Example

This example creates an Iceberg sink connector with PostgreSQL as the catalog:

{
"name": "iceberg_sink_jdbc",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "test-topic",
"iceberg.catalog.type": "jdbc",
"iceberg.catalog.uri": "jdbc:postgresql://postgres.example.com:5432/iceberg_db?user=iceberg_user&password=secret&ssl=require",
"iceberg.catalog.warehouse": "s3://my-s3-bucket",
"iceberg.catalog.client.region": "us-west-2",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "your-access-key-id",
"iceberg.catalog.s3.secret-access-key": "your-secret-access-key",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.jdbc.useSSL": "true",
"iceberg.catalog.jdbc.verifyServerCertificate": "true",
"iceberg.tables": "mydatabase.mytable",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"iceberg.kafka.bootstrap.servers": "kafka.example.com:9092",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "password",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "password",
"iceberg.kafka.ssl.key.password": "password"
}

Related pages