Skip to main content

Create a sink connector from Apache Kafka® to Google BigQuery

Set up the BigQuery sink connector to move data from Aiven for Apache Kafka® into BigQuery tables for analysis and storage.

See the full list of parameters in the GitHub documentation.

Prerequisites

Collect the following details:

  • GOOGLE_CLOUD_PROJECT_NAME: Target Google Cloud project name

  • GOOGLE_CLOUD_SERVICE_KEY: Google Cloud service account key in JSON format

    warning

    When adding the service key to the connector configuration, provide it as an escaped string:

    • Escape all " characters as \"
    • Escape any \n in the private_key field as \\n

    Example: {\"type\": \"service_account\",\"project_id\": \"XXXXXX\", ...}

  • BIGQUERY_DATASET_NAME: BigQuery dataset name

  • TOPIC_LIST: Comma-separated list of Kafka topics to sink

  • Only if using Avro: Schema Registry connection details:

    • APACHE_KAFKA_HOST
    • SCHEMA_REGISTRY_PORT
    • SCHEMA_REGISTRY_USER
    • SCHEMA_REGISTRY_PASSWORD
note

Schema Registry connection details are available in the Overview page of your Kafka service, under Connection information, in the Schema Registry tab. Aiven uses Karapace as the Schema Registry.

Google credential source restrictions

When using Google Cloud external account credentials with Aiven for Apache Kafka® Connect, Aiven applies security restrictions to prevent unauthorized file access and network requests.

If the Google Cloud credential JSON file includes a credential_source object, the following restrictions apply:

  • credential_source.file: Not allowed
  • credential_source.executable.command: Not allowed
  • credential_source.url: Allowed only when allow-listed in the service configuration (gcp_auth_allowed_urls)

Example credential_source object using a URL-based credential:

{
"credential_source": {
"url": "https://sts.googleapis.com/v1/token",
"headers": {
"Metadata-Flavor": "Google"
}
}
}
important

gcp_auth_allowed_urls is a Kafka Connect service-level configuration, not a connector configuration. Configure it in the Kafka Connect service settings. This setting applies to all connectors in the service.

Configure allowed authentication URLs

To use URL-based credentials (credential_source.url), configure the following:

  1. Configure the Kafka Connect service with allowed authentication URLs.
  2. Configure each connector to use one of the allowed URLs.

Set gcp_auth_allowed_urls on the Kafka Connect service to define which HTTPS authentication endpoints the service can access. This setting applies to all connectors in the service.

  1. Go to the Aiven Console.
  2. Select the Aiven for Apache Kafka Connect service.
  3. Click Service settings.
  4. In Advanced configuration, click Configure.
  5. Set gcp_auth_allowed_urls to the required HTTPS endpoints.
  6. Click Save configuration.

If multiple connectors use URL-based credentials, add all required authentication URLs to gcp_auth_allowed_urls. Each unique URL needs to be added only once.

If credential_source.url is set but the URL is not included in gcp_auth_allowed_urls, connector creation fails.

In the connector configuration JSON, set credential_source.url to match one of the URLs configured in the service.

Configure Google Cloud

  1. Create a service account and generate a JSON key: In the Google Cloud Console, create a service account and generate a JSON key. See Google’s guide for details. You’ll use this key in the connector configuration.

  2. Enable the BigQuery API: In the API & Services dashboard, enable the BigQuery API if it isn’t already. See Google’s reference for details.

  3. Create a BigQuery dataset: In the BigQuery Console, create a dataset or use an existing one. See Google’s guide. Select a region close to your Kafka service to reduce latency.

  4. Grant dataset access to the service account: In the BigQuery Console, grant your service account the BigQuery Data Editor role on the dataset. See Google’s access control guide.

Write methods

  • Google Cloud Storage (default): Uses GCS as an intermediate step. Supports all features, including delete and upsert. Parameters used only with delete or upsert:

    • intermediateTableSuffix
    • kafkaKeyFieldName (required)
    • mergeIntervalMs
  • Storage Write API: Streams data directly into BigQuery. Enable by setting useStorageWriteApi to true. This method provides lower latency for streaming workloads. Parameters used only with Storage Write API:

    • bigQueryPartitionDecorator
    • commitInterval
    • enableBatchMode
    warning

    Do not use the Storage Write API with deleteEnabled or upsertEnabled.

If useStorageWriteApi is not set, the connector uses the standard Google Cloud Storage API by default.

Create a BigQuery sink connector configuration

Define the connector configuration in a JSON file, for example bigquery_sink.json.

{
"name": "CONNECTOR_NAME",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "TOPIC_LIST",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryClient.basic.auth.credentials.source": "URL",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"allBQFieldsNullable": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
note

To use the Storage Write API instead of the default GCS method, add "useStorageWriteApi": "true" to the configuration.

The configuration file includes:

  • name: Connector name

  • connector.class: Must be com.wepay.kafka.connect.bigquery.BigQuerySinkConnector

  • topics: Comma-separated list of Kafka topics to write to BigQuery

  • project: Target Google Cloud project name

  • defaultDataset: BigQuery dataset name, prefixed with .*=

    note

    By default, table names in BigQuery match the Kafka topic names. Use the Kafka Connect RegexRouter transformation to rename tables if needed.

If your messages are in Avro format, also set these parameters:

  • schemaRegistryLocation: Karapace schema registry endpoint
  • key.converter and value.converter: Set to io.confluent.connect.avro.AvroConverter for Avro
  • key.converter.schema.registry.url and value.converter.schema.registry.url: Schema registry URL (https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT)
  • key.converter.schema.registry.basic.auth.user.info and value.converter.schema.registry.basic.auth.user.info: Schema registry credentials in the format SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD

For table management, you can set:

  • autoCreateTables: Create target BigQuery tables automatically if they do not exist
  • allBQFieldsNullable: Set new BigQuery fields to NULLABLE instead of REQUIRED

For schema evolution, you can also set:

  • allowNewBigQueryFields: Add new fields from Kafka schemas to BigQuery tables
  • allowBigQueryRequiredFieldRelaxation: Relax REQUIRED fields back to NULLABLE
warning

Automatic schema evolution reduces control over table definitions and may cause errors if message schemas change in ways BigQuery cannot support.

For authentication, set:

  • keySource: Format of the Google Cloud service account key, set to JSON
  • keyfile: Google Cloud service account key as an escaped string

Create a BigQuery sink connector

  1. Go to the Aiven Console.
  2. Select your Aiven for Apache Kafka® or Kafka Connect service.
  3. Click Connectors.
  4. Click Create connector (enable Kafka Connect if required).
  5. Select Google BigQuery Sink from the list.
  6. On the Common tab, click Edit in the Connector configuration box.
  7. Paste the contents of bigquery_sink.json. Replace placeholders with actual values.
  8. Click Apply, then Create connector.
  9. Verify the connector status on the Connectors page.
  10. Check that data appears in the BigQuery dataset. By default, table names match topic names. Use the Kafka Connect RegexRouter transformation to rename tables if required.

Examples

Sink a JSON topic

Suppose you have a topic iot_measurements containing JSON messages with an inline schema:

{
"schema": {
"type": "struct",
"fields": [
{ "type": "int64", "field": "iot_id" },
{ "type": "string", "field": "metric" },
{ "type": "int32", "field": "measurement" }
]
},
"payload": { "iot_id": 1, "metric": "Temperature", "measurement": 14 }
}

Connector configuration:

{
"name": "iot_sink",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "iot_measurements",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}

Parameters:

  • topics: Source topic
  • value.converter: JSON converter without schema
note

Inline JSON schemas increase message size and add processing overhead. For efficiency, prefer Avro format with Karapace Schema Registry.

Sink an Avro topic

Suppose you have a topic students with messages in Avro format and schemas stored in Karapace.

Connector configuration:

{
"name": "students_sink",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "students",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}

Parameters:

  • topics: Source topic
  • key.converter and value.converter: Enable Avro parsing with Karapace schema registry

Sink an Avro topic using Storage Write API

To stream Avro messages directly into BigQuery with lower latency.

Connector configuration:

{
"name": "students_sink_write_api",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "students",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY",
"useStorageWriteApi": "true",
"commitInterval": "1000"
}

Parameters:

  • useStorageWriteApi: Enables direct streaming into BigQuery
  • commitInterval: Flush interval for Storage Write API batches (in ms)