Aiven Blog

Jun 20, 2017

Kafka Connect Preview

Kafka Connect is now available in preview from Aiven. Find out which connectors we’re starting with and how to set them up.

heikki-nousiainen

Heikki Nousiainen

|RSS Feed

Chief Technology Officer at Aiven

We're delighted to announce public preview for Kafka Connect support for Aiven for Apache Kafka. During preview, Kafka Connect is available at no extra cost as part of all Aiven for Apache Kafka Business and Premium plans. We're launching with support for OpenSearch connector, and will soon follow with S3 and other connectors.

Kafka Connect

Kafka Connect is a framework for linking Kafka with other services. It makes it simple to define and configure connectors to reliably and scalably stream data between different systems. Kafka Connect provides a standard API for integration, handles offset management and workload distribution automatically.

You can define and configure individual connectors via the Kafka Connect REST interface.

Case example - IoT Device Shadow

A customer of ours is using Aiven for Apache Kafka for capturing telemetry from a fleet of IoT devices. To that end, Aiven for Apache Kafka has proven to be a scalable and flexible pipeline for capturing and distributing traffic for processing.

During the past month, we've worked together to support a new use case: maintaining a "device shadow" or a latest state update in OpenSearch. This copy allows developers to query and access device states regardless whether the devices are currently online and connected or not.

We built this new pipeline together with Kafka Connect and OpenSearch Connector. You can follow these steps to set up a similar pipeline.

Getting started: Launching Kafka and OpenSearch services

Create an Aiven for Apache Kafka service and create your topics for the incoming traffic. In this example, we'll be using Business-4 plan for the service and 16 partitions to accommodate for the client load.

First we'll launch a Kafka cluster from the Aiven Console. This cluster will receive the state updates from the IoT devices. A fairly low-spec cluster will work for this use case and we will launch it in one of the AWS regions:

Next, we'll create a Kafka topic for our data under the Topics tab.

We chose 16 partitions in this example, but you should select a number that matches with your workload. A larger number allows for higher throughput to support, but on the other hand increases resource usage on both the cluster as well as the consumer side. Contact us if unsure, we can help you to find a suitable plan.

We will also need an OpenSearch cluster for the device shadow data. We'll choose a three-node cluster with 4 GB memory in each node. Make note of the OpenSearch Service URL, which we'll use with the Kafka Connector configuration in the next steps.

We'll need to enable Kafka Connect by clicking the "Enable" button next to it in the service view. We also make a note of Kafka Connect access URL, which we will need in the following steps.

Setting up the pipeline with scripts

We'll be using a couple of Python code snippets to configure our data pipeline. We've downloaded the project and Kafka access certificates as ca.pem, service.cert and service.key to a local directory from the Kafka service view.

You can refer to startup guides for both Aiven Kafka and Aiven OpenSearch for details on setting up the environment.

Here's our first snippet named query_connector_plugins.py for finding out the available connector plugins:

import requests AIVEN_KAFKA_CONNECT_URL = "https://avnadmin:m9jyevsaehezqs36@gadget-kafka.htn-aiven-demo.aivencloud.com:22142" response = requests.get("{}/connector-plugins".format(AIVEN_KAFKA_CONNECT_URL)) print(response.text)

By running the script we can find out the available connector plugins:

$ python3 query_connector_plugins.py [{"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"}]

To get started with the pipeline configuration, we'll pre-create an OpenSearch index with a schema to meet our needs with script name create_elastic_index.py:

import json import requests AIVEN_ELASTICSEARCH_URL = "https://avnadmin:in9zvfjaio32m0qy@gadget-elastic.htn-aiven-demo.aivencloud.com:24185" mapping = { "settings": { "number_of_shards": 16 }, "mappings": { "kafka-connect-gadget-telemetry": { "properties": { "location": { "type": "string" }, "temperature": { "type": "integer" }, "timestamp": { "type": "date" } } } } } response = requests.put( "{}/gadget-telemetry?pretty=true".format(AIVEN_ELASTICSEARCH_URL), headers={"content-type": "application/json"}, data=json.dumps(mapping), verify="ca.pem", ) print(response.text)

Next, we'll run the script and the Elasticsearch index is created:

$ python3 create_elastic_index.py { "acknowledged" : true, "shards_acknowledged" : true }

Here's how we create and configure the actual Elasticsearch Connector to link our telemetry topic and Elasticsearch with a script named create_es_connector.py:

import requests import json AIVEN_KAFKA_CONNECT_URL = "https://avnadmin:m9jyevsaehezqs36@gadget-kafka.htn-aiven-demo.aivencloud.com:22142" AIVEN_ELASTICSEARCH_URL = "https://avnadmin:in9zvfjaio32m0qy@gadget-elastic.htn-aiven-demo.aivencloud.com:24185" connector_create_request = { "name": "gadget-es-sink", "config": { "connection.url": AIVEN_ELASTICSEARCH_URL, "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": 3, "topics": "gadget-telemetry", "type.name": "kafka-connect-gadget-telemetry" # This points to the created ES mapping } } response = requests.post( "{}/connectors".format(AIVEN_KAFKA_CONNECT_URL), headers={"Content-Type": "application/json"}, data=json.dumps(connector_create_request) ) print(response.text)

And enable the Connector by running the script:

$ python3 create_es_connector.py {"name":"gadget-es-sink","config":{"topics":"gadget-telemetry", "type.name":"kafka-connect-gadget-telemetry", "tasks.max":"3", "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url":"https://avnadmin:in9zvfjaio32m0qy@gadget-elastic.htn-aiven-demo.aivencloud.com:24185", "name":"gadget-es-sink"}, "tasks":[]}

Next, we're going to send some simulated telemetry data to test everything out:

from kafka import KafkaProducer import datetime import json import random AIVEN_KAFKA_URL = "gadget-kafka.htn-aiven-demo.aivencloud.com:22144" LOCATIONS = ["arizona", "california", "nevada", "utah"] producer = KafkaProducer( bootstrap_servers=AIVEN_KAFKA_URL, security_protocol="SSL", ssl_cafile="ca.pem", ssl_certfile="service.cert", ssl_keyfile="service.key", ) for i in range(10): device_name = "gadget_{}".format(i) telemetry = { "location": random.choice(LOCATIONS), "temperature": random.randint(40, 120), "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") } key = device_name.encode("utf-8") payload = json.dumps(telemetry).encode("utf-8") producer.send("gadget-telemetry", key=key, value=payload) # Wait for all messages to be sent producer.flush() print("Done, sent {} messages".format(i))

Now we execute the script:

$ python3 submit_telemetry.py Done, sent 10 messages

Exploring the data with OpenSearch dashboards

All of our Elasticsearch plans include integrated Kibana, which can be a handy tool for exploring and/or visualizing the data too. We can easily verify that our telemetry is flowing all the way to our Elasticsearch instance.

Clicking the Kibana link under the Elasticsearch service information page opens a view to OpenSearch dashboards. We are greeted with a configuration page where we enter the name of our Elasticsearch index created in one of the earlier steps:

Default discovery view on our sample data. The default view lists our entries. Since we're using keyed messages and the entry is always replaced with the latest entry, the timeline view will show only the timestamp of the last reception.

Accessing data in Elasticsearch

The real value of the new pipeline is realized with the ability to query for device information from Elasticsearch. In the Elasticsearch example query script (query_elasticsearch.py) below, we'll query for all devices that last reported from Arizona:

import requests import json AIVEN_ELASTICSEARCH_URL = "https://avnadmin:in9zvfjaio32m0qy@gadget-elastic.htn-aiven-demo.aivencloud.com:24185" response = requests.get( "{}/_search?q=location:arizona&pretty=true".format(AIVEN_ELASTICSEARCH_URL), verify="ca.pem" )
print(response.text)

Running the script show the list of active gadgets in the target region:

$ python3 query_elasticsearch.py
{ "took" : 7, "timed_out" : false, "_shards" : { "total" : 4, "successful" : 4, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.3862944, "hits" : [ { "_index" : "gadget-telemetry", "_type" : "kafka-connect-gadget-telemetry", "_id" : "gadget_2", "_score" : 1.3862944, "_source" : { "temperature" : 114, "location" : "arizona", "timestamp" : "2017-12-06T13:55:01Z" } }, { "_index" : "gadget-telemetry", "_type" : "kafka-connect-gadget-telemetry", "_id" : "gadget_5", "_score" : 1.2039728, "_source" : { "temperature" : 45, "location" : "arizona", "timestamp" : "2017-12-06T13:55:01Z" } } ] } }

The above example is easily extended to query data by a certain temperature threshold, location or time of the last update. Or, if we want to check the state of a single device, we now have the latest state available by its ID.

Summary

In this example, we built a simple telemetry pipeline with Kafka, Kafka Connect and Elasticsearch. We used Elasticsearch connector, which is the first connector we support with Aiven Kafka. We'll be following up with S3 connector shortly with others to follow.

Get in touch if we could help you with your business requirement!

Give Aiven services a whirl

Remember that trying Aiven is free: you will receive US$10 worth of free credits at sign-up which you can use to try any of our service plans. The offer works for all of our services: PostgreSQL, Redis, Grafana, Elasticsearch and Kafka!

Go to aiven.io to get started!


Related resources