Create a sink connector from Apache Kafka® to Google Cloud Storage
The Apache Kafka Connect® Google Cloud Storage (GCS) sink connector by Aiven enables you to move data from an Aiven for Apache Kafka® cluster to a Google Cloud Storage bucket for long term storage.
The full connector documentation is available in the dedicated GitHub repository.
See the full set of available parameters and configuration options in the connector's documentation.
Prerequisites
-
An Aiven for Apache Kafka® service with Apache Kafka Connect enabled or a dedicated Aiven for Apache Kafka Connect cluster.
-
Prepare the GCP account and GCS sink and collect the following information about the target GCS bucket:
GCS_NAME
: The name of the GCS bucketGCS_CREDENTIALS
: The Google service account JSON service key created during the prerequisite phase
The GCS sink connector accepts the GCS_CREDENTIALS
JSON service key as
string, therefore all "
symbols within it must be escaped \"
.
The GCS_CREDENTIALS
parameter should be in the format
{\"type\": \"service_account\",\"project_id\": \"XXXXXX\", ...}
Additionally, any \n
symbols contained in the private_key
field need
to be escaped (by substituting with \\n
)
Setup an GCS sink connector with Aiven Console
The following example demonstrates how to setup an Apache Kafka Connect® GCS sink connector using the Aiven Console.
Define an Apache Kafka Connect® configuration file
Define the connector configurations in a file (we'll refer to it with
the name gcs_sink.json
) with the following content:
{
"name": "my-gcs-connector",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "TOPIC_NAME",
"gcs.credentials.json": "GCS_CREDENTIALS",
"gcs.bucket.name": "GCS_NAME",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}
The configuration file contains the following entries:
name
: The connector nametopics
: The list of Apache Kafka® topics to sink to the GCS bucketkey.converter
andvalue.converter
: Data converters, depending on the topic data format. Check the GitHub repository documentation for more informationgcs.credentials.json
: The Google service account JSON service key as JSON stringgcs.bucket.name
: The name of the GCS bucketfile.name.prefix
: The file name prefixfile.compression.type
: The type of compression to use when creating the fileformat.output.type
: The format used to store the message valuesformat.output.fields
: The message fields to be included in the target file
You can define GCS sink connector naming and data formats by setting the dedicated parameters.
See the GitHub repository parameters documentation for the full list of configuration options.
Create a Kafka Connect connector with the Aiven Console
To create a Kafka Connect connector:
-
Log in to the Aiven Console and select the Aiven for Apache Kafka® or Aiven for Apache Kafka Connect® service where the connector needs to be defined.
-
Select Connectors from the left sidebar.
-
Select Create New Connector, it is enabled only for services with Kafka Connect enabled.
-
Select Google Cloud Storage sink.
-
In the Common tab, locate the Connector configuration text box and select on Edit.
-
Paste the connector configuration (stored in the
gcs_sink.json
file) in the form. -
Select Apply.
noteThe Aiven Console parses the configuration file and fills the relevant UI fields. You can review the UI fields across the various tab and change them if necessary. The changes will be reflected in JSON format in the Connector configuration text box.
-
After all the settings are correctly configured, select Create connector.
-
Verify the connector status under the Connectors screen.
-
Verify the presence of the data in the target GCS bucket.
You can also create connectors using the Aiven CLI command.
Example: define a GCS sink connector
The example creates an GCS sink connector with the following properties:
- connector name:
my_gcs_sink
- source topics:
test
- target GCS bucket name:
my-test-bucket
- target Google service key:
{\"type\": \"service_account\", \"project_id\": \XXXXXXXXX\", ..}
- name prefix:
my-custom-prefix/
- data compression:
gzip
- message data format:
jsonl
- fields to include in the message:
value, offset
- number of messages per file: 1
The connector configuration is the following:
{
"name": "my_gcs_sink",
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "test",
"gcs.credentials.json": "{\"type\": \"service_account\", \"project_id\": \XXXXXXXXX\", ..}",
"gcs.bucket.name": "my-test-bucket",
"file.name.prefix": "my-custom-prefix/",
"file.compression.type": "gzip",
"file.max.records": "1",
"format.output.type": "jsonl",
"format.output.fields": "value,offset"
}