Create a Google Cloud Storage sink connector for Apache Kafka®
The Google Cloud Storage (GCS) sink connector moves data from Aiven for Apache Kafka® topics to a Google Cloud Storage bucket for long-term storage.
Prerequisites
-
An Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Kafka Connect cluster
-
Access to a Google Cloud project where you can create:
- a Google Cloud Storage bucket
- a Google service account with a JSON service key
-
Collect the following values for connector configuration:
GCS_NAME: The name of the target Google Cloud Storage bucketGCS_CREDENTIALS: The Google service account JSON key
For a full list of configuration options, see the Google Cloud Storage sink connector documentation.
The connector expects GCS_CREDENTIALS as a single JSON string. Escape all " symbols
as \".
Example:
{\"type\":\"service_account\",\"project_id\":\"XXXXXX\",...}
If the private_key field contains \n, escape it as \\n.
Google credential source restrictions
When using Google Cloud external account credentials with Aiven for Apache Kafka® Connect, Aiven applies security restrictions to prevent unauthorized file access and network requests.
If the Google Cloud credential JSON file includes a credential_source object, the
following restrictions apply:
credential_source.file: Not allowedcredential_source.executable.command: Not allowedcredential_source.url: Allowed only when allow-listed in the service configuration (gcp_auth_allowed_urls)
Example credential_source object using a URL-based credential:
{
"credential_source": {
"url": "https://sts.googleapis.com/v1/token",
"headers": {
"Metadata-Flavor": "Google"
}
}
}
gcp_auth_allowed_urls is a Kafka Connect service-level configuration, not a
connector configuration. Configure it in the Kafka Connect service settings. This setting
applies to all connectors in the service.
Configure allowed authentication URLs
To use URL-based credentials (credential_source.url), configure the following:
- Configure the Kafka Connect service with allowed authentication URLs.
- Configure each connector to use one of the allowed URLs.
Set gcp_auth_allowed_urls on the Kafka Connect service to define which HTTPS
authentication endpoints the service can access. This setting applies to all connectors
in the service.
- Console
- CLI
- Go to the Aiven Console.
- Select the Aiven for Apache Kafka Connect service.
- Click Service settings.
- In Advanced configuration, click Configure.
- Set
gcp_auth_allowed_urlsto the required HTTPS endpoints. - Click Save configuration.
Run the following command:
avn service update KAFKA_CONNECT_SERVICE_NAME \
-c gcp_auth_allowed_urls='["https://sts.googleapis.com","https://iamcredentials.googleapis.com"]'
If multiple connectors use URL-based credentials, add all required authentication URLs to
gcp_auth_allowed_urls. Each unique URL needs to be added only once.
If credential_source.url is set but the URL is not included in gcp_auth_allowed_urls,
connector creation fails.
In the connector configuration JSON, set credential_source.url to match one of the URLs
configured in the service.
Configure Google Cloud for the connector
Create a Google Cloud Storage bucket and a Google service account key that the connector can use to write objects.
Create a Google Cloud Storage bucket
- In the Google Cloud console, open Cloud Storage.
- Create a bucket using the Cloud Storage buckets page.
- Specify the bucket name and location.
- Keep the other settings as default unless your organization requires otherwise.
Create a Google service account and JSON key
- Create a Google service account and JSON service key by following Google authentication instructions.
- Download the JSON service key.
You use this key in the connector configuration as GCS_CREDENTIALS.
Grant the service account access to the bucket
- Open the bucket in the Cloud Storage console.
- Go to the Permissions tab.
- Grant access to the service account.
Ensure the following permissions are granted:
storage.objects.createstorage.objects.delete(required for overwriting, for example during re-processing)
Grant these permissions using a custom role or the standard role Storage Legacy Bucket Writer.
Also ensure the bucket does not have a retention policy that prevents overwriting.
Create the connector configuration
Create a JSON configuration file, for example, gcs_sink.json:
{
"name": "my-gcs-connector",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"topics": "TOPIC_NAME",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "GCS_CREDENTIALS",
"gcs.bucket.name": "GCS_NAME",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}
Parameters:
name: The connector nametopics: Comma-separated list of Apache Kafka® topics to sink to the bucketkey.converterandvalue.converter: Message converters based on your topic formatgcs.credentials.json: The Google service account JSON key as a JSON stringgcs.bucket.name: The name of the target bucketfile.name.prefix: Prefix for files created in the bucketfile.compression.type: Compression type for output filesformat.output.type: Output file formatformat.output.fields: Message fields to include in output files
You can control file naming and output formats using dedicated parameters. For details, see GCS sink formats.
Create a Google Cloud Storage sink connector
- Aiven Console
- Aiven CLI
-
Access the Aiven Console.
-
Select your Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service.
-
In the sidebar, click Connectors.
-
Click Create connector if Apache Kafka Connect is already enabled on the service. If not, click Enable connector on this service.
To enable connectors:
- In the sidebar, click Service settings.
- In the Service management section, click Actions > Enable Kafka Connect.
-
In the list of sink connectors, click Get started under Google Cloud Storage sink.
-
On the connector page, open the Common tab.
-
In Connector configuration, click Edit.
-
Paste the configuration from your
gcs_sink.jsonfile into the text box. Replace placeholders with your actual values. -
Click Apply.
noteWhen you paste the JSON configuration, Aiven Console parses it and automatically populates the corresponding fields in the UI. Any changes you make in the UI are reflected in the Connector configuration JSON.
-
Click Create connector.
-
Verify the connector status on the Connectors page.
-
Confirm that data from the Apache Kafka topics appears in the target bucket.
To create a GCS sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @gcs_sink.json
Parameters:
SERVICE_NAME: The name of your Aiven for Apache Kafka® service@gcs_sink.json: The path to your connector configuration file
Examples
Create a GCS sink connector for a JSON topic
This example creates a connector with the following settings:
- Connector name:
my_gcs_sink - Source topic:
test - Bucket name:
my-test-bucket - Name prefix:
my-custom-prefix/ - Compression:
gzip - Output format:
jsonl - Output fields:
value, offset - Maximum records per file: 1
{
"name": "my_gcs_sink",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"topics": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"gcs.credentials.json": "{\"type\": \"service_account\", \"project_id\": \"XXXXXXXXX\", ...}",
"gcs.bucket.name": "my-test-bucket",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"file.max.records": "1",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}