Skip to main content

Create a Google Cloud Storage sink connector for Apache Kafka®

The Google Cloud Storage (GCS) sink connector moves data from Aiven for Apache Kafka® topics to a Google Cloud Storage bucket for long-term storage.

Prerequisites

  • An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Kafka Connect cluster

  • Access to a Google Cloud project where you can create:

    • a Google Cloud Storage bucket
    • a Google service account with a JSON service key
  • Collect the following values for connector configuration:

    • GCS_NAME: The name of the target Google Cloud Storage bucket
    • GCS_CREDENTIALS: The Google service account JSON key
note

For a full list of configuration options, see the Google Cloud Storage sink connector documentation.

warning

The connector expects GCS_CREDENTIALS as a single JSON string. Escape all " symbols as \".

Example:

{\"type\":\"service_account\",\"project_id\":\"XXXXXX\",...}

If the private_key field contains \n, escape it as \\n.

Configure Google Cloud for the connector

Create a Google Cloud Storage bucket and a Google service account key that the connector can use to write objects.

Create a Google Cloud Storage bucket

  1. In the Google Cloud console, open Cloud Storage.
  2. Create a bucket using the Cloud Storage buckets page.
  3. Specify the bucket name and location.
  4. Keep the other settings as default unless your organization requires otherwise.

Create a Google service account and JSON key

  1. Create a Google service account and JSON service key by following Google authentication instructions.
  2. Download the JSON service key.

You use this key in the connector configuration as GCS_CREDENTIALS.

Grant the service account access to the bucket

  1. Open the bucket in the Cloud Storage console.
  2. Go to the Permissions tab.
  3. Grant access to the service account.

Ensure the following permissions are granted:

  • storage.objects.create
  • storage.objects.delete (required for overwriting, for example during re-processing)

Grant these permissions using a custom role or the standard role Storage Legacy Bucket Writer.

Also ensure the bucket does not have a retention policy that prevents overwriting.

Create the connector configuration

Create a JSON configuration file, for example, gcs_sink.json:

{
"name": "my-gcs-connector",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "TOPIC_NAME",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "GCS_CREDENTIALS",
"gcs.bucket.name": "GCS_NAME",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}

Parameters:

  • name: The connector name
  • topics: Comma-separated list of Apache Kafka® topics to sink to the bucket
  • key.converter and value.converter: Message converters based on your topic format
  • gcs.credentials.json: The Google service account JSON key as a JSON string
  • gcs.bucket.name: The name of the target bucket
  • file.name.prefix: Prefix for files created in the bucket
  • file.compression.type: Compression type for output files
  • format.output.type: Output file format
  • format.output.fields: Message fields to include in output files
tip

You can control file naming and output formats using dedicated parameters. For details, see GCS sink formats.

Create a Google Cloud Storage sink connector

  1. Access the Aiven Console.

  2. Select your Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service.

  3. In the sidebar, click Connectors.

  4. Click Create connector if Apache Kafka Connect is already enabled on the service. If not, click Enable connector on this service.

    To enable connectors:

    1. In the sidebar, click Service settings.
    2. In the Service management section, click Actions > Enable Kafka Connect.
  5. In the list of sink connectors, click Get started under Google Cloud Storage sink.

  6. On the connector page, open the Common tab.

  7. In Connector configuration, click Edit.

  8. Paste the configuration from your gcs_sink.json file into the text box. Replace placeholders with your actual values.

  9. Click Apply.

    note

    When you paste the JSON configuration, Aiven Console parses it and automatically populates the corresponding fields in the UI. Any changes you make in the UI are reflected in the Connector configuration JSON.

  10. Click Create connector.

  11. Verify the connector status on the Connectors page.

  12. Confirm that data from the Apache Kafka topics appears in the target bucket.

Examples

Create a GCS sink connector for a JSON topic

This example creates a connector with the following settings:

  • Connector name: my_gcs_sink
  • Source topic: test
  • Bucket name: my-test-bucket
  • Name prefix: my-custom-prefix/
  • Compression: gzip
  • Output format: jsonl
  • Output fields: value, offset
  • Maximum records per file: 1
{
"name": "my_gcs_sink",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "{\"type\": \"service_account\", \"project_id\": \"XXXXXXXXX\", ...}",
"gcs.bucket.name": "my-test-bucket",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"file.max.records": "1",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}