Long Term Logging with Aiven for OpenSearch® and Aiven for Apache Kafka®

Meet your compliance goals using Aiven for OpenSearch®, S3 storage and Aiven for Apache Kafka®.

Tiered storage lets you extend expensive disk based storage, commonly used for data applications, with cheap object storage.
This lets you store more data at a fraction of the cost. Data retrieval times are slower, but for some use cases it's worth doing. One common use case is storing data for long periods due to regulatory requirements.

Both Apache Kafka® and OpenSearch® have tiered storage functionality as an additional feature to the open source release, but not in the core projects themselves at this time. We decided to implement this feature using Kafka Connect at Aiven so our customers don't have to wait!

Let's use an example of Aiven for Apache Kafka® as a logging source that generates data we want to send to a tiered storage solution. We'll use Terraform to define our services.

This post covers use cases where logging volumes grow exponentially as you scale. For example, you might work in an industry where compliance regulations require log retention for 7 years but, for your own observability, you rarely require logs older than 14 days. Storing all 7 years of logs can become financially problematic or require you to use more resources than you would typically need.

In this example, Apache Kafka acts as a data source for itself, but you can use Filebeat or other any other OpenSearch agent or ingestion tool supported.

Architecture diagram of the logging architecture we will implement.

Using Apache Kafka, you can push logs through message queues and utilize the Kafka ecosystem (Kafka Connect, primarily) to store logs in multiple destinations. This provides redundancy, as well as cost benefits. We'll use Apache Kafka as our log transport mechanism and push logs into both OpenSearch and Google Cloud Storage. In OpenSearch, we will retain logs for 14 days then remove the indices automatically. Next we'll create a Google Cloud Function to help developers retrieve logs from object storage as needed and reindex them in OpenSearch.

We'll do the following:

  1. Deploy Aiven for Apache Kafka with Kafka Connect support enabled.
  2. Create a Logging Service Integration that will allow Kafka to log to a topic within itself
  3. Deploy Aiven for OpenSearch and create an Index Pattern for the logging format we will use (one new index per day)
  4. Configure Aiven for OpenSearch to only hold the last 14 indices and delete older ones so that we do not have to monitor our index count or manually clean them up
  5. Configure the Google Cloud Storage Sink Connector to log to a bucket in our Google Cloud
  6. Configure the OpenSearch Sink Connector to log to a daily index in our Aiven for OpenSearch
  7. Deploy a Google Cloud Function that can take the name of a log file (and OpenSearch credentials) and restore the file to an index we specify

Set up

We'll use Aiven for Apache Kafka for this example, so before you do anything make sure to either sign up for a new account or log into an existing one. Registering with Aiven automatically signs you up for a free 30 day, 300 credit trial with no credit card needed!

You'll also need your own Google Cloud Storage account, as we'll use that as our object storage.

We'll implement a data stream that goes from Aiven for Apache Kafka and is stored in Aiven for OpenSearch. We'll then implement object storage using Google Cloud Storage. We'll write a Google Cloud Function to retrieve cold data and re-index it in OpenSearch as needed. We'll use Aiven for Apache Kafka Connect, specifically the OpenSearch Sink Connector and Google Cloud Storage Sink Connector to wire the services together.

To replicate the example:

  • Clone the Aiven examples repository and navigate to solutions/kafka_long_term_logging
  • Store your Google Cloud Storage account credentials in the terraform/gcreds.json file
  • Generate an Aiven API token
  • Copy the secrets.tfvars.example file and rename it secrets.tfvars. Populate it with your Aiven API token, project and service names

Compose the solution

Once we have cloned the repository, take a look at the files inside. Essentially we have a Terraform plan to configure our services and the function used to restore files from Google Cloud Storage. Let's break down what we have:

  • terraform/
    • gcp/
      • cloud_function - the source code for our restore function: a Python file with the dependencies in a requirements.txt file
      • restore.zip - The cloud_function folder compressed to be uploaded by Terraform
      • google_cloud_function.tf - The Terraform script for uploading the function to a bucket and creating a Google Cloud Function from the resulting upload
    • main.tf - The main Terraform script that will create the needed services and then create the Google Cloud Function using the resulting connection parameters
    • variables.tf - Used for the names of the services we want to deploy, their location and Google Cloud config


Initialize and deploy the Terraform plan by pulling the necessary modules from Aiven and Google Cloud. In the secrets.tfvars file, add our Aiven credentials:

aiven_api_token = "$YOUR_API_TOKEN" project = "$YOUR_PROJECT_NAME" cloud = "$YOUR_CLOUD_REGION" kafka_svc = "kafka-logger" es_svc = "os-logger"

For now, don't touch the kafka_svc or es_svc values. Google Cloud's Terraform provider does not need authentication because it authenticates itself, if you do not have this configured then the provider will prompt and guide you.

Enable Google Cloud Function and Google Cloud Build APIs enabled in the Google Cloud Console.

Next, run the following from the terraform/ directory to initialize Terraform, create the execution plan and execute:

terraform init terraform plan --var-file=secrets.tfvars terraform apply --var-file=secrets.tfvars

Once complete, you will have an Aiven for Apache Kafka service logging to itself and an Aiven for OpenSearch service to receive them. In Google Cloud, you will see a new Cloud Function, called restore, has been created.

Google Cloud Functions

The restore function is a Python script which calls an HTTP endpoint with the bucket we want to restore from, the file we want to restore and the name of the OpenSearch index to restore to. We need to set the connection information for our Aiven for OpenSearch cluster in the environment variables for the function:

  1. Navigate to Google Cloud Console, go to Cloud Functions and select the testing tab
  2. Use the testing tab to send an example request that will restore the provided file in a specified Google Cloud Storage bucket to a specified index in your OpenSearch cluster.

An example request might look like:

curl -m 70 -X POST https://google-demo.cloudfunctions.net/os-restore \ -H "Authorization:bearer $(gcloud auth print-identity-token)" \ -H "Content-Type:application/json" \ -d '{ "bucket": "os_backup", "file": "YYYY-MM-DD.json", "index": "restore-YYYY-MM-DD" }'

In this request, you provide the following:

  1. Your Google Cloud authentication token
  2. The name of the bucket you are storing logs in
  3. The filename you want to restore
  4. The index to restore to

Sending this creates a new index and adds the specified file contents to it. You can extend this to send an entire bucket or folder.

And voilà, you have near unlimited storage for your logs using Kafka as your log transport system and backing up to object storage with a restore function. As an added benefit, the Aiven platform automatically clears indexes matching the provided index pattern every few days; which means you do not need to maintain or worry about your cluster becoming too full.


What just happened, you ask? The magic of Terraform is not only in its ability to allow infrastructure to be managed in a similar way to how we manage code, it also allows us to share ideas in a reproducible and configurable way. No longer do we copy bash commands and have a webpage open on one side of the monitor with our IDE on the other, you can read this beautiful prose without interruption and then see that the author is not lying to you by running it yourself. It is the definition of, "here's one you can try at home".

This suggestion was born out of two needs: a need to enable engineering teams to be able to debug their applications and a need for the company to retain those logs to meet the compliance requirements of their industry. A true open source solution does not (yet) exist that allows OpenSearch to expand its storage infinitely using S3-compatible storage. It appears to be quite common that logs are not going directly to OpenSearch and are, instead, passing through Apache Kafka as a transport medium. This makes sense, not only from a scaling perspective but also to allow maintenance on your OpenSearch cluster without needing to impact your o11y solution and, as shown here, to enable the same data to be reused in multiple locations; compliance in sending logs to Cloud Storage and o11y in sending logs to OpenSearch.

Further reading