Configure the Iceberg sink connector with a PostgreSQL JDBC catalog
The JDBC catalog stores Iceberg table metadata in a PostgreSQL® database. Use it to integrate Apache Kafka®, Amazon S3, and a PostgreSQL-based metadata store.
Prerequisites
- Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service
- Apache Kafka topic that contains the source data
- Apache Kafka control topic used to coordinate Iceberg table commits
(for example,
control-iceberg
) - Aiven for PostgreSQL® service or another PostgreSQL instance that is publicly accessible
- Amazon S3 bucket for storing Iceberg table data
- IAM user or role with permissions to read and write to the S3 bucket
- AWS access key ID and secret access key for the IAM credentials
Create an Iceberg sink connector configuration
To configure the Iceberg sink connector with the JDBC catalog, define a JSON file that uses PostgreSQL for metadata and Amazon S3 for storage.
Loading worker properties is not supported. Use iceberg.kafka.*
properties instead.
{
"name": "CONNECTOR_NAME",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "KAFKA_TOPICS",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"consumer.override.auto.offset.reset": "earliest",
"iceberg.kafka.auto.offset.reset": "earliest",
"iceberg.tables": "DATABASE.TABLE",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"iceberg.catalog.type": "jdbc",
"iceberg.catalog.uri": "jdbc:postgresql://HOST:PORT/DATABASE?user=USERNAME&password=PASSWORD&ssl=require",
"iceberg.catalog.warehouse": "s3://BUCKET_NAME",
"iceberg.catalog.client.region": "AWS_REGION",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.jdbc.useSSL": "true",
"iceberg.catalog.jdbc.verifyServerCertificate": "true",
"iceberg.catalog.s3.access-key-id": "AWS_ACCESS_KEY_ID",
"iceberg.catalog.s3.secret-access-key": "AWS_SECRET_ACCESS_KEY",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.kafka.bootstrap.servers": "KAFKA_HOST:PORT",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "KEYSTORE_PASSWORD",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "TRUSTSTORE_PASSWORD",
"iceberg.kafka.ssl.key.password": "KEY_PASSWORD"
}
Parameters:
name
: Name of the connectorconnector.class
: Connector implementation classtopics
: Comma-separated list of Apache Kafka topics with source dataiceberg.catalog.type
: Catalog type. Usejdbc
for PostgreSQLiceberg.catalog.uri
: JDBC connection string for PostgreSQLiceberg.catalog.warehouse
: S3 bucket URI for table storageiceberg.catalog.client.region
: AWS region where the S3 bucket is located. Required if no region is set in the environment or system propertiesiceberg.tables
: Target Iceberg table inDATABASE.TABLE
format.iceberg.tables.auto-create-enabled
: Automatically create tables if they do not existiceberg.catalog.io-impl
: File I/O implementation for S3iceberg.catalog.s3.access-key-id
andsecret-access-key
: AWS credentials for S3iceberg.kafka.*
: Apache Kafka security settings
For the full list of parameters, see the Iceberg Kafka Connect configuration.
Create the connector
- Aiven Console
- Aiven CLI
- Access the Aiven Console.
- Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
- Click Connectors.
- Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
Alternatively, to enable connectors:
-
Click Service settings in the sidebar.
-
In the Service management section, click Actions > Enable Kafka connect.
-
In the sink connectors list, select Iceberg Sink Connector, and click Get started.
-
On the Iceberg Sink Connector page, go to the Common tab.
-
Locate the Connector configuration text box and click Edit.
-
Paste the configuration from your
iceberg_sink_connector.json
file into the text box. -
Click Create connector.
-
Verify the connector status on the Connectors page.
To create the Iceberg sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @iceberg_sink_connector.json
Parameters:
SERVICE_NAME
: Name of your Aiven for Apache Kafka® service.@iceberg_sink_connector.json
: Path to the JSON configuration file.
Example
This example creates an Iceberg sink connector with PostgreSQL as the catalog:
{
"name": "iceberg_sink_jdbc",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "test-topic",
"iceberg.catalog.type": "jdbc",
"iceberg.catalog.uri": "jdbc:postgresql://postgres.example.com:5432/iceberg_db?user=iceberg_user&password=secret&ssl=require",
"iceberg.catalog.warehouse": "s3://my-s3-bucket",
"iceberg.catalog.client.region": "us-west-2",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "your-access-key-id",
"iceberg.catalog.s3.secret-access-key": "your-secret-access-key",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.jdbc.useSSL": "true",
"iceberg.catalog.jdbc.verifyServerCertificate": "true",
"iceberg.tables": "mydatabase.mytable",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"iceberg.kafka.bootstrap.servers": "kafka.example.com:9092",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "password",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "password",
"iceberg.kafka.ssl.key.password": "password"
}
Related pages