Skip to main content

Create a Google Cloud Storage sink connector for Apache Kafka®

The Google Cloud Storage (GCS) sink connector moves data from Aiven for Apache Kafka® topics to a Google Cloud Storage bucket for long-term storage.

Prerequisites

  • An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Kafka Connect cluster

  • Access to a Google Cloud project where you can create:

    • a Google Cloud Storage bucket
    • a Google service account with a JSON service key
  • Collect the following values for connector configuration:

    • GCS_NAME: The name of the target Google Cloud Storage bucket
    • GCS_CREDENTIALS: The Google service account JSON key
note

For a full list of configuration options, see the Google Cloud Storage sink connector documentation.

warning

The connector expects GCS_CREDENTIALS as a single JSON string. Escape all " symbols as \".

Example:

{\"type\":\"service_account\",\"project_id\":\"XXXXXX\",...}

If the private_key field contains \n, escape it as \\n.

Google credential source restrictions

When using Google Cloud external account credentials with Aiven for Apache Kafka® Connect, Aiven applies security restrictions to prevent unauthorized file access and network requests.

If the Google Cloud credential JSON file includes a credential_source object, the following restrictions apply:

  • credential_source.file: Not allowed
  • credential_source.executable.command: Not allowed
  • credential_source.url: Allowed only when allow-listed in the service configuration (gcp_auth_allowed_urls)

Example credential_source object using a URL-based credential:

{
"credential_source": {
"url": "https://sts.googleapis.com/v1/token",
"headers": {
"Metadata-Flavor": "Google"
}
}
}
important

gcp_auth_allowed_urls is a Kafka Connect service-level configuration, not a connector configuration. Configure it in the Kafka Connect service settings. This setting applies to all connectors in the service.

Configure allowed authentication URLs

To use URL-based credentials (credential_source.url), configure the following:

  1. Configure the Kafka Connect service with allowed authentication URLs.
  2. Configure each connector to use one of the allowed URLs.

Set gcp_auth_allowed_urls on the Kafka Connect service to define which HTTPS authentication endpoints the service can access. This setting applies to all connectors in the service.

  1. Go to the Aiven Console.
  2. Select the Aiven for Apache Kafka Connect service.
  3. Click Service settings.
  4. In Advanced configuration, click Configure.
  5. Set gcp_auth_allowed_urls to the required HTTPS endpoints.
  6. Click Save configuration.

If multiple connectors use URL-based credentials, add all required authentication URLs to gcp_auth_allowed_urls. Each unique URL needs to be added only once.

If credential_source.url is set but the URL is not included in gcp_auth_allowed_urls, connector creation fails.

In the connector configuration JSON, set credential_source.url to match one of the URLs configured in the service.

Configure Google Cloud for the connector

Create a Google Cloud Storage bucket and a Google service account key that the connector can use to write objects.

Create a Google Cloud Storage bucket

  1. In the Google Cloud console, open Cloud Storage.
  2. Create a bucket using the Cloud Storage buckets page.
  3. Specify the bucket name and location.
  4. Keep the other settings as default unless your organization requires otherwise.

Create a Google service account and JSON key

  1. Create a Google service account and JSON service key by following Google authentication instructions.
  2. Download the JSON service key.

You use this key in the connector configuration as GCS_CREDENTIALS.

Grant the service account access to the bucket

  1. Open the bucket in the Cloud Storage console.
  2. Go to the Permissions tab.
  3. Grant access to the service account.

Ensure the following permissions are granted:

  • storage.objects.create
  • storage.objects.delete (required for overwriting, for example during re-processing)

Grant these permissions using a custom role or the standard role Storage Legacy Bucket Writer.

Also ensure the bucket does not have a retention policy that prevents overwriting.

Create the connector configuration

Create a JSON configuration file, for example, gcs_sink.json:

{
"name": "my-gcs-connector",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "TOPIC_NAME",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "GCS_CREDENTIALS",
"gcs.bucket.name": "GCS_NAME",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}

Parameters:

  • name: The connector name
  • topics: Comma-separated list of Apache Kafka® topics to sink to the bucket
  • key.converter and value.converter: Message converters based on your topic format
  • gcs.credentials.json: The Google service account JSON key as a JSON string
  • gcs.bucket.name: The name of the target bucket
  • file.name.prefix: Prefix for files created in the bucket
  • file.compression.type: Compression type for output files
  • format.output.type: Output file format
  • format.output.fields: Message fields to include in output files
tip

You can control file naming and output formats using dedicated parameters. For details, see GCS sink formats.

Create a Google Cloud Storage sink connector

  1. Access the Aiven Console.

  2. Select your Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service.

  3. In the sidebar, click Connectors.

  4. Click Create connector if Apache Kafka Connect is already enabled on the service. If not, click Enable connector on this service.

    To enable connectors:

    1. In the sidebar, click Service settings.
    2. In the Service management section, click Actions > Enable Kafka Connect.
  5. In the list of sink connectors, click Get started under Google Cloud Storage sink.

  6. On the connector page, open the Common tab.

  7. In Connector configuration, click Edit.

  8. Paste the configuration from your gcs_sink.json file into the text box. Replace placeholders with your actual values.

  9. Click Apply.

    note

    When you paste the JSON configuration, Aiven Console parses it and automatically populates the corresponding fields in the UI. Any changes you make in the UI are reflected in the Connector configuration JSON.

  10. Click Create connector.

  11. Verify the connector status on the Connectors page.

  12. Confirm that data from the Apache Kafka topics appears in the target bucket.

Examples

Create a GCS sink connector for a JSON topic

This example creates a connector with the following settings:

  • Connector name: my_gcs_sink
  • Source topic: test
  • Bucket name: my-test-bucket
  • Name prefix: my-custom-prefix/
  • Compression: gzip
  • Output format: jsonl
  • Output fields: value, offset
  • Maximum records per file: 1
{
"name": "my_gcs_sink",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "{\"type\": \"service_account\", \"project_id\": \"XXXXXXXXX\", ...}",
"gcs.bucket.name": "my-test-bucket",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"file.max.records": "1",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}