Create a sink connector from Apache Kafka® to Google BigQuery
Set up the BigQuery sink connector to move data from Aiven for Apache Kafka® into BigQuery tables for analysis and storage.
See the full list of parameters in the GitHub documentation.
Prerequisites
- An Aiven for Apache Kafka service with Apache Kafka Connect enabled, or a dedicated Kafka Connect cluster
- A Google Cloud project with:
- A service account and JSON service key
- BigQuery API enabled
- A BigQuery dataset to store data
- Dataset access granted to the service account
Collect the following details:
-
GOOGLE_CLOUD_PROJECT_NAME
: Target Google Cloud project name -
GOOGLE_CLOUD_SERVICE_KEY
: Google Cloud service account key in JSON formatwarningWhen adding the service key to the connector configuration, provide it as an escaped string:
- Escape all
"
characters as\"
- Escape any
\n
in theprivate_key
field as\\n
Example:
{\"type\": \"service_account\",\"project_id\": \"XXXXXX\", ...}
- Escape all
-
BIGQUERY_DATASET_NAME
: BigQuery dataset name -
TOPIC_LIST
: Comma-separated list of Kafka topics to sink -
Only if using Avro: Schema Registry connection details:
APACHE_KAFKA_HOST
SCHEMA_REGISTRY_PORT
SCHEMA_REGISTRY_USER
SCHEMA_REGISTRY_PASSWORD
Schema Registry connection details are available in the Overview page of your Kafka service, under Connection information, in the Schema Registry tab. Aiven uses Karapace as the Schema Registry.
Configure Google Cloud
-
Create a service account and generate a JSON key: In the Google Cloud Console, create a service account and generate a JSON key. See Google’s guide for details. You’ll use this key in the connector configuration.
-
Enable the BigQuery API: In the API & Services dashboard, enable the BigQuery API if it isn’t already. See Google’s reference for details.
-
Create a BigQuery dataset: In the BigQuery Console, create a dataset or use an existing one. See Google’s guide. Select a region close to your Kafka service to reduce latency.
-
Grant dataset access to the service account: In the BigQuery Console, grant your service account the BigQuery Data Editor role on the dataset. See Google’s access control guide.
Create a BigQuery sink connector configuration
Define the connector configuration in a JSON file, for example bigquery_sink.json
.
{
"name": "CONNECTOR_NAME",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "TOPIC_LIST",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryClient.basic.auth.credentials.source": "URL",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"allBQFieldsNullable": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
The configuration file includes:
name
: Connector nameconnector.class
: Must becom.wepay.kafka.connect.bigquery.BigQuerySinkConnector
topics
: Comma-separated list of Kafka topics to write to BigQueryproject
: Target Google Cloud project namedefaultDataset
: BigQuery dataset name, prefixed with.*=
If your messages are in Avro format, also set these parameters:
schemaRegistryLocation
: Karapace schema registry endpointkey.converter
andvalue.converter
: Set toio.confluent.connect.avro.AvroConverter
for Avrokey.converter.schema.registry.url
andvalue.converter.schema.registry.url
: Schema registry URL (https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT
)key.converter.schema.registry.basic.auth.user.info
andvalue.converter.schema.registry.basic.auth.user.info
: Schema registry credentials in the formatSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD
For table management, you can set:
autoCreateTables
: Create target BigQuery tables automatically if they do not existallBQFieldsNullable
: Set new BigQuery fields toNULLABLE
instead ofREQUIRED
For schema evolution, you can also set:
allowNewBigQueryFields
: Add new fields from Kafka schemas to BigQuery tablesallowBigQueryRequiredFieldRelaxation
: RelaxREQUIRED
fields back toNULLABLE
Automatic schema evolution reduces control over table definitions and may cause errors if message schemas change in ways BigQuery cannot support.
For authentication, set:
keySource
: Format of the Google Cloud service account key, set toJSON
keyfile
: Google Cloud service account key as an escaped string
Create a BigQuery sink connector
- Aiven Console
- Aiven CLI
- Go to the Aiven Console.
- Select your Aiven for Apache Kafka® or Kafka Connect service.
- Click Connectors.
- Click Create connector (enable Kafka Connect if required).
- Select Google BigQuery Sink from the list.
- On the Common tab, click Edit in the Connector configuration box.
- Paste the contents of
bigquery_sink.json
. Replace placeholders with actual values. - Click Apply, then Create connector.
- Verify the connector status on the Connectors page.
- Check that data appears in the BigQuery dataset. By default, table names match topic
names. Use the Kafka Connect
RegexRouter
transformation to rename tables if required.
Run the following command to create the connector:
avn service connector create SERVICE_NAME @bigquery_sink.json
Parameters:
SERVICE_NAME
: Name of your Aiven for Apache Kafka service@bigquery_sink.json
: Path to the connector configuration file
Examples
Sink a JSON topic
Suppose you have a topic iot_measurements
containing JSON messages with an inline schema:
{
"schema": {
"type": "struct",
"fields": [
{ "type": "int64", "field": "iot_id" },
{ "type": "string", "field": "metric" },
{ "type": "int32", "field": "measurement" }
]
},
"payload": { "iot_id": 1, "metric": "Temperature", "measurement": 14 }
}
Connector configuration:
{
"name": "iot_sink",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "iot_measurements",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
topics
: Source topicvalue.converter
: JSON converter without schema
Inline JSON schemas increase message size and add processing overhead. For efficiency, prefer Avro format with Karapace Schema Registry.
Sink an Avro topic
Suppose you have a topic students
with messages in Avro format and schemas stored in Karapace.
Connector configuration:
{
"name": "students_sink",
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "students",
"project": "GOOGLE_CLOUD_PROJECT_NAME",
"defaultDataset": ".*=BIGQUERY_DATASET_NAME",
"schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
"schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
"autoCreateTables": "true",
"keySource": "JSON",
"keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
topics
: Source topickey.converter
andvalue.converter
: Enable Avro parsing with Karapace schema registry