15 Feb 2022

Move from batch to streaming with Apache Kafka® and Apache Flink®

Find out how Apache Kafka® and Apache Flink® allow you to move from batch processing to streaming, but keep using SQL in the data pipeline.

francesco-tisiot

Francesco Tisiot

|

RSS Feed

Senior Developer Advocate at Aiven

Move from batch to streaming with Apache Kafka® and Apache Flink® illustration

If data is the new gold then data pipelines must be the mining shafts and SQL the pickaxe: allowing information to travel across the company from the sources to the desired decision-enabling dashboards. But driving the data assets to the right audience is only part of the problem; performing it in a timely manner is becoming more and more critical. The old days of watching today a dashboard containing yesterday's data are gone, we need to analyse, capture trends and detect outliers as soon as possible.

More and more companies are consequently going away from batch and embracing streaming solutions which enable near real time data pipeline definition. To make this transition easier, selecting a target technology speaking a language similar to the original is usually a sensible choice, since it enables existing skills reusability with small adaptations.

The combination of Apache Kafka® and Apache Flink®, two open source projects aiming respectively at transmitting and computing streaming data, is therefore a good choice, since it enables the transition from batch to streaming keeping the data pipeline definitions in the data practitioners most beloved language: SQL!

We previously wrote about the duo, showcasing how you could write your own streaming data pipelines using a dockerized version of Apache Flink®'s SQL client. This time we'll create new data pipelines on a different use-case and show how we can minimize the analytics latency. On top of this, there is more good news: we don't have to care anymore about running Apache Flink® ourselves, since we can rely on the managed services provided by Aiven!

For the purpose of this blog post, we are going to mimic an inbound streaming dataset of IoT CPU utilization measurements. Since IoT devices can be geographically distributed, messages can arrive with a delay or potentially out of order, thus, before further processing, we want to allow late arrivals of IoT records with a delay up to 10 seconds. In rough terms this means that we'll wait 10 more seconds before finishing off any window calculation.

When we monitor any type of hardware, we might want to check if the device health parameters are within an optimal range, and if not, start raising alerts. We can achieve that by creating a data pipeline to filter all high cpu utilization records and push them to a new Apache Kafka® topic, where a downstream consuming application will then trigger the alerts. In the old batch days we would run the same query every few minutes to check for high values, with streaming we can redirect the problematic records as soon as they appear.

Checking every individual sample against the threshold might be a bit bursty, and we are ok if a single cpu sample goes over the limit. On the other side, we might want to calculate the average and maximum cpu level at 5 minutes interval, since those metrics can help us identifying problematic CPUs with consistent heavy load. To achieve this second monitoring step, we'll create another pipeline aggregating samples with 5 minutes windows. Please consider, that this use case was also solvable in batch mode, but in that case we had to start the batch load 10 seconds (the allowed delay) after the 5 minutes window and see the results only after the batch time. If the batch time of collecting data, calculating and storing results is 1 minute, we would discover the end status with 1 minute and 10 seconds of delay.

With Apache Flink®, if the query allows, the window information is captured and calculated incrementally during the window time itself. Therefore, after the 10 seconds delay is finished, there can be a minimal overhead due to the last computation to finish before emitting the result. For simple queries, we would retrieve the results with just a little bit more of the 10 seconds of forced delay.

What we'll notice when writing the two data pipelines is how Apache Flink® SQL is similar to the SQL we would use to query a relational database. This makes the migration from batch processes to streaming a matter of learning the little peculiar tricks of Apache Flink.

We will use Aiven for Apache Kafka® as the data bus and Aiven for Apache Flink® to define some data pipelines in SQL. Aiven provides a beautiful web console that we can use to create all our services, but for the aim of this blog post, we will rely on the Aiven CLI to setup and manage our instances, since it provides a way to script the entire process and produce replicable results.

Once the Aiven CLI is installed, we can start creating the Aiven for Apache Kafka® service by issuing the following command in the terminal:

avn service create demo-kafka \ --service-type kafka \ --cloud google-europe-west3 \ --plan business-4 \ -c kafka.auto_create_topics_enable=true \ -c kafka_rest=true \ -c schema_registry=true

The above creates an Apache Kafka® instance named demo-kafka located in google-europe-west3 region, with the 3-node cluster defined by the business-4 plan. To store schema information we enable schema registry via Aiven's Karapace, and we allow querying it via REST calls by enabling Kafka REST. Finally we allow the automatic creation of topics on the first message, useful for a demo project like this.

Now, we can create the Aiven for Apache Flink® service with:

avn service create demo-flink \ --service-type flink \ --cloud google-europe-west3 \ --plan business-4

Compared to the previous call, the only difference is the service name (now demo-flink) and type (now flink).

To complete the setup, we need to connect the two pillars with a service integration, and again the Aiven CLI is our friend.

avn service integration-create \ -t flink \ -s demo-kafka \ -d demo-flink

In the above, we specify an integration of type flink connecting the data source demo-kafka to the Aiven for Apache Flink® service called demo-flink.

We need some data to play with, and instead of the usual pizza example, we will now use fake metrics reported by IoT (Internet of Things) devices. The beauty is that it's contained in the same GitHub repository that we were using for our previous example, which has a flag called subject allowing us to generate various types of fake data.

The associated GitHub repository provides a dockerised version. To start using it we first clone the repository

git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker

Next, we need to create an access token, that will be used by the Docker instance to retrieve the Apache Kafka®'s service URI and certificates, with the following Aiven CLI command and jq to fetch the results:

avn user access-token create \ --description "Token used by Fake data generator" \ --max-age-seconds 3600 \ --json | jq -r '.[].full_token'

The above generates a token valid for 1 hour (3600 secs). Take note of the command output in the FULL_TOKEN field since now it's time to include it in the repo config file. Within the fake-data-producer-for-apache-kafka-docker folder, let's copy the conf/env.conf.sample to conf/env.conf and edit the file with the following content:

PROJECT_NAME="[YOUR_PROJECT_NAME]" SERVICE_NAME="demo-kafka" TOPIC="iot-data-input" PARTITIONS=1 REPLICATION=2 NR_MESSAGES=0 MAX_TIME=1 SUBJECT="metric" USERNAME="[YOUR_ACCOUNT]" TOKEN="[YOUR_ACCESS_TOKEN]"

Replace the [YOUR_PROJECT_NAME] with the project name were Aiven for Apache Kafka® is running, and the duo [YOUR_ACCOUNT], [YOUR_ACCESS_TOKEN] with the account credentials necessary to attach to Apache Kafka®. If you followed these examples they would be your email and the token you generated a few steps above.

Time to build the docker image with:

docker build -t fake-data-producer-for-apache-kafka-docker .

If you change any parameter in the conf/env.conf file (like the access token), you need to rebuild the image to take the changes into the Docker images.

Finally start the fake stock data flow with:

docker run fake-data-producer-for-apache-kafka-docker

After few seconds we should see an infinite amount of messages getting created in the Apache Kafka® topic named iot-data-input. We can check them using kcat. Use the instructions in the dedicated doc page to install and set up the kcat.config configuration file and then check the data flowing in Apache Kafka® with.

kcat -F kcat.config -C -t iot-data-input

We should see a stream of IoT device cpu usage measurements.

For the rest of the post, we'll need the ID of the integration we just created. To retrieve it, we can issue the following command relying on the Aiven CLI and some jq filtering of the results

KAFKA_FLINK_SI=$( \ avn service integration-list \ demo-flink \ --json \ | jq -r '.[] | select(.source == "demo-kafka") | .service_integration_id')

The above command lists the integrations, filtering the one pointing to the demo-kafka service and storing its ID in the KAFKA_FLINK_SI variable, that we'll use later.

The next step is to setup a streaming data pipeline in Apache Flink®. Since we are using the Aiven managed service we can use the Aiven client to set up all the steps, by first defining an Apache Flink® table over the inbound Apache Kafka® topic in a new terminal window with:

avn service flink table create demo-flink $KAFKA_FLINK_SI \ --table-name iot_in \ --kafka-topic iot-data-input \ --kafka-connector-type kafka \ --kafka-value-format json \ --kafka-startup-mode earliest-offset \ --schema-sql ''' hostname VARCHAR, cpu VARCHAR, usage DOUBLE, `occurred_at` BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(`occurred_at`, 3), WATERMARK FOR time_ltz AS time_ltz - INTERVAL '\''10'\'' seconds'''

The above command generates an Apache Flink® table definition over the demo-flink using the integration id with the Apache Kafka® service stored in the $KAFKA_FLINK_SI we fetched before. The table is called iot_in, reading from the beginning (earliest-offset) of the topic iot-data-input using the standard kafka connector (check more about the different Apache Kafka® connectors in the dedicated documentation) and defining the inbound source of data to be json format.

The schema of the table maps the fields hostname, cpu, usage and occurred_at. Moreover it sets a new field, called time_ltz, which casts the occurred_at timestamp in linux format to the native timestamp used by Apache Flink®. Finally the WATERMARK section allows late events to arrive: we are enabling the IoT measurements to be included as valid and considered for the downstream data pipelines if they arrive within 10 seconds delay.

We can notice that, apart from the peculiar WATERMARK definition, the rest of the SQL definition is really similar to what we used over relational databases.

We can define the data output of our filtering data pipeline with the following command:

avn service flink table create demo-flink $KAFKA_FLINK_SI \ --table-name iot_filtered_alert \ --kafka-topic iot-filtered-alert \ --kafka-connector-type kafka \ --kafka-value-format json \ --schema-sql ''' hostname VARCHAR, time_ltz TIMESTAMP(3), cpu VARCHAR, usage DOUBLE '''

Next we need to fetch the two table IDs that we're going to need for the filtering data pipeline definition. We can retrieve the ids with:

TABLE_IN_ID=$(avn service flink table list demo-flink --json \ | jq -r '.[] | select(.table_name == "iot_in") | .table_id') TABLE_FILTER_OUT_ID=$(avn service flink table list demo-flink --json \ | jq -r '.[] | select(.table_name == "iot_filtered_alert") | .table_id')

Now we can define the SQL data pipeline for filtering with:

avn service flink job create demo-flink my_first_filter \ --table-ids $TABLE_IN_ID $TABLE_FILTER_OUT_ID \ --statement ''' insert into iot_filtered_alert select hostname, time_ltz, cpu, usage from iot_in where usage > 90 '''

The above filters the IoT measurements having usage greather than 90% and inserts them into the Apache Kafka® topic pointed by the iot_filtered_alert table. We are passing the table IDs $TABLE_IN_ID and $TABLE_FILTER_OUT_ID retrieved at the previous step.

We can check the filtered data being pushed to the Apache Kafka® topic named iot-filtered-output with kcat and the following call

kcat -F kcat.config -C -t iot-filtered-alert

Only measurements with over 90% utilization should appear in the iot-filtered-alert topic, similar to the below:

{"hostname":"dopey","time_ltz":"2022-01-21 11:05:23.319","cpu":"cpu4","usage":90.51760978462883} {"hostname":"happy","time_ltz":"2022-01-21 11:05:25.954","cpu":"cpu4","usage":99.6863297672615} {"hostname":"sneezy","time_ltz":"2022-01-21 11:05:30.701","cpu":"cpu5","usage":97.19928378993606} {"hostname":"doc","time_ltz":"2022-01-21 11:05:32.142","cpu":"cpu2","usage":95.69989296729409} {"hostname":"doc","time_ltz":"2022-01-21 11:05:32.773","cpu":"cpu3","usage":94.04115316937872} {"hostname":"grumpy","time_ltz":"2022-01-21 11:05:34.052","cpu":"cpu2","usage":95.45458336597062} {"hostname":"bashful","time_ltz":"2022-01-21 11:05:34.506","cpu":"cpu1","usage":93.83903103097724}

Great, we built our first filtering pipeline redirecting problematic samples to a new Apache Kafka® topic where downstream applications can access them and generate the required alerts. It's streaming, so the alerts are detected and propagated in near real time.

In the second scenario we wanted to define an aggregation over the IoT metrics calculating the average and maximum usage over 5 minutes windows to help us smoothing the bursty sample behavior. The source table is always the iot_in defined above, we need only to define a new output table with:

avn service flink table create demo-flink $KAFKA_FLINK_SI \ --table-name iot_avg_out \ --kafka-topic iot-avg-output \ --kafka-connector-type kafka \ --kafka-value-format json \ --schema-sql ''' window_start TIMESTAMP(3), window_end TIMESTAMP(3), hostname VARCHAR, cpu VARCHAR, avg_usage DOUBLE, max_usage DOUBLE '''

The above command generates a new Apache Flink® table definition called iot_avg_out pointing to the topic iot-avg-output. The results will be written in JSON format. The schema SQL includes the window_start and window_end columns defining the boundaries of the window calculated using the event time, the IoT device hostname and cpu, and the aggregated columns for average and maximum usage.

As before, the streaming job definition requires the table ID as parameter, we can retrieve the new table ID with the following command:

TABLE_AGG_OUT_ID=$(avn service flink table list demo-flink --json \ | jq -r '.[] | select(.table_name == "iot_avg_out") | .table_id')

With all the information needed, we can now define the streaming job with:

avn service flink job create demo-flink my_first_agg \ --table-ids $TABLE_IN_ID $TABLE_AGG_OUT_ID \ --statement ''' insert into iot_avg_out select window_start, window_end, hostname, cpu, avg(usage), max(usage) from TABLE(TUMBLE( TABLE iot_in, DESCRIPTOR(time_ltz), INTERVAL '\''5'\'' MINUTES)) group by window_start, window_end, hostname, cpu '''

The above call creates a new job named my_first_agg including the $TABLE_IN_ID from earlier and the $TABLE_AGG_OUT_ID table id we fetched in the previous step. The SQL statement creates the 5 minutes time windows, based on the event time column time_ltz, with the TUMBLE option (check out the dedicated window documentation) and performs the aggregations. We can notice again, that, apart from the custom TUMBLE definition, the SQL statement is really similar to the one we could use in a relational database.

To check the aggregated results being pushed to the iot-avg-output Apache Kafka® topic we use kcat and the following call:

kcat -F kcat.config -C -t iot-avg-output

The result should be similar to the following, showing the window start and end time, the IoT device hostname, cpu and the aggregated utilization metrics

{"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"dopey","cpu":"cpu2","avg_usage":83.75267676383785,"max_usage":98.69540798403519} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"happy","cpu":"cpu2","avg_usage":86.92580172018437,"max_usage":99.19071554040988} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"happy","cpu":"cpu1","avg_usage":88.89410118795028,"max_usage":98.63471212455435} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"dopey","cpu":"cpu4","avg_usage":84.0739509212965,"max_usage":98.74529247031236} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"grumpy","cpu":"cpu4","avg_usage":79.39103768789909,"max_usage":91.3979757069514} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"sneezy","cpu":"cpu2","avg_usage":84.2944495941075,"max_usage":99.55062006322083} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"grumpy","cpu":"cpu3","avg_usage":82.57313518548608,"max_usage":99.47190111994642} {"window_start":"2022-01-21 10:20:00","window_end":"2022-01-21 10:25:00","hostname":"sneezy","cpu":"cpu4","avg_usage":86.71661332259018,"max_usage":99.61266191120228}

Now that we created our 5 minutes windows topic, we could iterate again in our monitoring solution, by adding a follow up Apache Flink® job filtering only the data going over a threshold. The beauty of this approach is that we can plug one step of our solution at a time, storing the intermediate data in the common bus represented by Apache Kafka®. Furthermore since the data is stored in an Apache Kafka® topic, we could use Apache Kafka Connect® to sink the data to OpenSearch® and use OpenSearch Dashboard or Grafana® to visualize the trends.

With the combination of Apache Flink®, Apache Kafka®, and Apache Kafka Connect® we have endless possibilities in terms of streaming analytics and integrations.

Apache Kafka® and Apache Flink® allow you to move away from batch processing and embrace streaming while keeping a familiar SQL interface for the pipeline definitions. Even more, Apache Flink® rich SQL syntax allows you to define aggregations, boundaries and temporal limits that would be somehow hard to define on traditional databases. To know a bit more about this open source combination and the related managed services Aiven is offering:

If you're not using Aiven services yet, go ahead and sign up now for your free trial at https://console.aiven.io/signup!

Related blogs

All things open source, plus our product updates and news in a monthly newsletter.

Subscribe to the Aiven newsletter

Loading...