Skip to main content

Create a sink connector from Apache Kafka® to Google BigQuery

Set up the BigQuery sink connector to move data from Aiven for Apache Kafka® into BigQuery tables for analysis and storage.

See the full list of parameters in the GitHub documentation.

Prerequisites

Collect the following details:

  • GOOGLE_CLOUD_PROJECT_NAME: Target Google Cloud project name

  • GOOGLE_CLOUD_SERVICE_KEY: Google Cloud service account key in JSON format

    warning

    When adding the service key to the connector configuration, provide it as an escaped string:

    • Escape all " characters as \"
    • Escape any \n in the private_key field as \\n

    Example: {\"type\": \"service_account\",\"project_id\": \"XXXXXX\", ...}

  • BIGQUERY_DATASET_NAME: BigQuery dataset name

  • TOPIC_LIST: Comma-separated list of Kafka topics to sink

  • Only if using Avro: Schema Registry connection details:

    • APACHE_KAFKA_HOST
    • SCHEMA_REGISTRY_PORT
    • SCHEMA_REGISTRY_USER
    • SCHEMA_REGISTRY_PASSWORD
note

Schema Registry connection details are available in the Overview page of your Kafka service, under Connection information, in the Schema Registry tab. Aiven uses Karapace as the Schema Registry.

Configure Google Cloud

  1. Create a service account and generate a JSON key: In the Google Cloud Console, create a service account and generate a JSON key. See Google’s guide for details. You’ll use this key in the connector configuration.

  2. Enable the BigQuery API: In the API & Services dashboard, enable the BigQuery API if it isn’t already. See Google’s reference for details.

  3. Create a BigQuery dataset: In the BigQuery Console, create a dataset or use an existing one. See Google’s guide. Select a region close to your Kafka service to reduce latency.

  4. Grant dataset access to the service account: In the BigQuery Console, grant your service account the BigQuery Data Editor role on the dataset. See Google’s access control guide.

Create a BigQuery sink connector configuration

Define the connector configuration in a JSON file, for example bigquery_sink.json.

{
"name": "CONNECTOR_NAME",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "TOPIC_LIST",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryClient.basic.auth.credentials.source": "URL",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"allBQFieldsNullable": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}

The configuration file includes:

  • name: Connector name
  • connector.class: Must be com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
  • topics: Comma-separated list of Kafka topics to write to BigQuery
  • project: Target Google Cloud project name
  • defaultDataset: BigQuery dataset name, prefixed with .*=

If your messages are in Avro format, also set these parameters:

  • schemaRegistryLocation: Karapace schema registry endpoint
  • key.converter and value.converter: Set to io.confluent.connect.avro.AvroConverter for Avro
  • key.converter.schema.registry.url and value.converter.schema.registry.url: Schema registry URL (https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT)
  • key.converter.schema.registry.basic.auth.user.info and value.converter.schema.registry.basic.auth.user.info: Schema registry credentials in the format SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD

For table management, you can set:

  • autoCreateTables: Create target BigQuery tables automatically if they do not exist
  • allBQFieldsNullable: Set new BigQuery fields to NULLABLE instead of REQUIRED

For schema evolution, you can also set:

  • allowNewBigQueryFields: Add new fields from Kafka schemas to BigQuery tables
  • allowBigQueryRequiredFieldRelaxation: Relax REQUIRED fields back to NULLABLE
warning

Automatic schema evolution reduces control over table definitions and may cause errors if message schemas change in ways BigQuery cannot support.

For authentication, set:

  • keySource: Format of the Google Cloud service account key, set to JSON
  • keyfile: Google Cloud service account key as an escaped string

Create a BigQuery sink connector

  1. Go to the Aiven Console.
  2. Select your Aiven for Apache Kafka® or Kafka Connect service.
  3. Click Connectors.
  4. Click Create connector (enable Kafka Connect if required).
  5. Select Google BigQuery Sink from the list.
  6. On the Common tab, click Edit in the Connector configuration box.
  7. Paste the contents of bigquery_sink.json. Replace placeholders with actual values.
  8. Click Apply, then Create connector.
  9. Verify the connector status on the Connectors page.
  10. Check that data appears in the BigQuery dataset. By default, table names match topic names. Use the Kafka Connect RegexRouter transformation to rename tables if required.

Examples

Sink a JSON topic

Suppose you have a topic iot_measurements containing JSON messages with an inline schema:

{
"schema": {
"type": "struct",
"fields": [
{ "type": "int64", "field": "iot_id" },
{ "type": "string", "field": "metric" },
{ "type": "int32", "field": "measurement" }
]
},
"payload": { "iot_id": 1, "metric": "Temperature", "measurement": 14 }
}

Connector configuration:

{
"name": "iot_sink",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "iot_measurements",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
  • topics: Source topic
  • value.converter: JSON converter without schema
note

Inline JSON schemas increase message size and add processing overhead. For efficiency, prefer Avro format with Karapace Schema Registry.

Sink an Avro topic

Suppose you have a topic students with messages in Avro format and schemas stored in Karapace.

Connector configuration:

{
"name": "students_sink",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "students",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
  • topics: Source topic
  • key.converter and value.converter: Enable Avro parsing with Karapace schema registry