Create a Google Cloud Storage sink connector for Apache Kafka®
The Google Cloud Storage (GCS) sink connector moves data from Aiven for Apache Kafka® topics to a Google Cloud Storage bucket for long-term storage.
Prerequisites
-
An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Kafka Connect cluster
-
Access to a Google Cloud project where you can create:
- a Google Cloud Storage bucket
- a Google service account with a JSON service key
-
Collect the following values for connector configuration:
GCS_NAME: The name of the target Google Cloud Storage bucketGCS_CREDENTIALS: The Google service account JSON key
For a full list of configuration options, see the Google Cloud Storage sink connector documentation.
The connector expects GCS_CREDENTIALS as a single JSON string. Escape all " symbols
as \".
Example:
{\"type\":\"service_account\",\"project_id\":\"XXXXXX\",...}
If the private_key field contains \n, escape it as \\n.
Configure Google Cloud for the connector
Create a Google Cloud Storage bucket and a Google service account key that the connector can use to write objects.
Create a Google Cloud Storage bucket
- In the Google Cloud console, open Cloud Storage.
- Create a bucket using the Cloud Storage buckets page.
- Specify the bucket name and location.
- Keep the other settings as default unless your organization requires otherwise.
Create a Google service account and JSON key
- Create a Google service account and JSON service key by following Google authentication instructions.
- Download the JSON service key.
You use this key in the connector configuration as GCS_CREDENTIALS.
Grant the service account access to the bucket
- Open the bucket in the Cloud Storage console.
- Go to the Permissions tab.
- Grant access to the service account.
Ensure the following permissions are granted:
storage.objects.createstorage.objects.delete(required for overwriting, for example during re-processing)
Grant these permissions using a custom role or the standard role Storage Legacy Bucket Writer.
Also ensure the bucket does not have a retention policy that prevents overwriting.
Create the connector configuration
Create a JSON configuration file, for example, gcs_sink.json:
{
"name": "my-gcs-connector",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "TOPIC_NAME",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "GCS_CREDENTIALS",
"gcs.bucket.name": "GCS_NAME",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}
Parameters:
name: The connector nametopics: Comma-separated list of Apache Kafka® topics to sink to the bucketkey.converterandvalue.converter: Message converters based on your topic formatgcs.credentials.json: The Google service account JSON key as a JSON stringgcs.bucket.name: The name of the target bucketfile.name.prefix: Prefix for files created in the bucketfile.compression.type: Compression type for output filesformat.output.type: Output file formatformat.output.fields: Message fields to include in output files
You can control file naming and output formats using dedicated parameters. For details, see GCS sink formats.
Create a Google Cloud Storage sink connector
- Aiven Console
- Aiven CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service.
-
In the sidebar, click Connectors.
-
Click Create connector if Apache Kafka Connect is already enabled on the service. If not, click Enable connector on this service.
To enable connectors:
- In the sidebar, click Service settings.
- In the Service management section, click Actions > Enable Kafka Connect.
-
In the list of sink connectors, click Get started under Google Cloud Storage sink.
-
On the connector page, open the Common tab.
-
In Connector configuration, click Edit.
-
Paste the configuration from your
gcs_sink.jsonfile into the text box. Replace placeholders with your actual values. -
Click Apply.
noteWhen you paste the JSON configuration, Aiven Console parses it and automatically populates the corresponding fields in the UI. Any changes you make in the UI are reflected in the Connector configuration JSON.
-
Click Create connector.
-
Verify the connector status on the Connectors page.
-
Confirm that data from the Apache Kafka topics appears in the target bucket.
To create a GCS sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @gcs_sink.json
Parameters:
SERVICE_NAME: The name of your Aiven for Apache Kafka® service@gcs_sink.json: The path to your connector configuration file
Examples
Create a GCS sink connector for a JSON topic
This example creates a connector with the following settings:
- Connector name:
my_gcs_sink - Source topic:
test - Bucket name:
my-test-bucket - Name prefix:
my-custom-prefix/ - Compression:
gzip - Output format:
jsonl - Output fields:
value, offset - Maximum records per file: 1
{
"name": "my_gcs_sink",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "{\"type\": \"service_account\", \"project_id\": \"XXXXXXXXX\", ...}",
"gcs.bucket.name": "my-test-bucket",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"file.max.records": "1",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}