Move from batch to streaming with Apache Kafka® and Apache Flink®
Find out how Apache Kafka® and Apache Flink® allow you to move from batch processing to streaming, but keep using SQL in the data pipeline.
If data is the new gold then data pipelines must be the mining shafts and SQL the pickaxe: allowing information to travel across the company from the sources to the desired decision-enabling dashboards. But driving the data assets to the right audience is only part of the problem; performing it in a timely manner is becoming more and more critical. The old days of watching today a dashboard containing yesterday's data are gone, we need to analyse, capture trends and detect outliers as soon as possible.
More and more companies are consequently going away from batch and embracing streaming solutions which enable near real time data pipeline definition. To make this transition easier, selecting a target technology speaking a language similar to the original is usually a sensible choice, since it enables existing skills reusability with small adaptations.
The combination of Apache Kafka® and Apache Flink®, two open source projects aiming respectively at transmitting and computing streaming data, is therefore a good choice, since it enables the transition from batch to streaming keeping the data pipeline definitions in the data practitioners most beloved language: SQL!
We previously wrote about the duo, showcasing how you could write your own streaming data pipelines using a dockerized version of Apache Flink®'s SQL client. This time we'll create new data pipelines on a different use-case and show how we can minimize the analytics latency. On top of this, there is more good news: we don't have to care anymore about running Apache Flink® ourselves, since we can rely on the managed services provided by Aiven!
You can also watch this tutorial on YouTube:
The use case
For the purpose of this blog post, we are going to mimic an inbound streaming dataset of IoT CPU utilization measurements. Since IoT devices can be geographically distributed, messages can arrive with a delay or potentially out of order, thus, before further processing, we want to allow late arrivals of IoT records with a delay up to 10 seconds. In rough terms this means that we'll wait 10 more seconds before finishing off any window calculation.
When we monitor any type of hardware, we might want to check if the device health parameters are within an optimal range, and if not, start raising alerts. We can achieve that by creating a data pipeline to filter all high cpu utilization records and push them to a new Apache Kafka® topic, where a downstream consuming application will then trigger the alerts. In the old batch days we would run the same query every few minutes to check for high values, with streaming we can redirect the problematic records as soon as they appear.
Checking every individual sample against the threshold might be a bit bursty, and we are ok if a single cpu sample goes over the limit. On the other side, we might want to calculate the average and maximum cpu level at 5 minutes interval, since those metrics can help us identifying problematic CPUs with consistent heavy load. To achieve this second monitoring step, we'll create another pipeline aggregating samples with 5 minutes windows.
Please consider, that this use case was also solvable in batch mode, but in that case we had to start the batch load 10 seconds (the allowed delay) after the 5 minutes window and see the results only after the batch time. If the batch time of collecting data, calculating and storing results is 1 minute, we would discover the end status with 1 minute and 10 seconds of delay.
With Apache Flink®, if the query allows, the window information is captured and calculated incrementally during the window time itself. Therefore, after the 10 seconds delay is finished, there can be a minimal overhead due to the last computation to finish before emitting the result. For simple queries, we would retrieve the results with just a little bit more of the 10 seconds of forced delay.
What we'll notice when writing the two data pipelines is how Apache Flink® SQL is similar to the SQL we would use to query a relational database. This makes the migration from batch processes to streaming a matter of learning the little peculiar tricks of Apache Flink.
Define the building blocks
We will use Aiven for Apache Kafka® as the data bus and Aiven for Apache Flink® to define some data pipelines in SQL.
Aiven provides a beautiful web console that we can use to create all our services, but for the aim of this blog post, we will rely on the Aiven CLI to setup and manage our instances, since it provides a way to script the entire process and produce replicable results.
To make our command lines simpler, we can store the Aiven project name we are using in a shell variable, AVN_PROJECT_NAME
, replacing the YOUR_PROJECT_NAME
placeholder with the actual project name. For instance, in bash
:
AVN_PROJECT_NAME=YOUR_PROJECT_NAME
Once the Aiven CLI is installed, we can start creating the Aiven for Apache Kafka® service by issuing the following command in the terminal:
avn service create demo-kafka \ --project $AVN_PROJECT_NAME \ --service-type kafka \ --cloud google-europe-west3 \ --plan business-4 \ -c kafka.auto_create_topics_enable=true \ -c kafka_rest=true \ -c schema_registry=true
The above creates an Apache Kafka® instance named demo-kafka
located in google-europe-west3
region, with the 3-node cluster defined by the business-4
plan. To store schema information we enable schema registry via Aiven's Karapace, and we allow querying it via REST calls by enabling Kafka REST. Finally we allow the automatic creation of topics on the first message, useful for a demo project like this.
Now, we can create the Aiven for Apache Flink® service with:
avn service create demo-flink \ --project $AVN_PROJECT_NAME \ --service-type flink \ --cloud google-europe-west3 \ --plan business-4
Compared to the previous call, the only difference is the service name (now demo-flink
) and type (now flink
).
To complete the setup, we need to connect the two pillars with a service integration, and again the Aiven CLI is our friend.
avn service integration-create \ --project $AVN_PROJECT_NAME \ -t flink \ -s demo-kafka \ -d demo-flink
In the above, we specify an integration of type flink
connecting the data source demo-kafka
to the Aiven for Apache Flink® service called demo-flink
.
Define the streaming input
We need some data to play with, and instead of the usual pizza example, we will now use fake metrics reported by IoT (Internet of Things) devices. The beauty is that it's contained in the same GitHub repository that we were using for our previous example, which has a flag called subject
allowing us to generate various types of fake data.
The associated GitHub repository provides a dockerised version. To start using it we first clone the repository
git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker
Next, we need to create an access token, that will be used by the Docker instance to retrieve the Apache Kafka®'s service URI and certificates, with the following Aiven CLI command and jq to fetch the results:
avn user access-token create \ --description "Token used by Fake data generator" \ --max-age-seconds 3600 \ --json | jq -r '.[].full_token'
The above generates a token valid for 1 hour (3600 secs). Take note of the command output in the FULL_TOKEN
field since now it's time to include it in the repo config file. Within the fake-data-producer-for-apache-kafka-docker
folder, let's copy the conf/env.conf.sample
to conf/env.conf
and edit the file with the following content:
PROJECT_NAME="[YOUR_PROJECT_NAME]" SERVICE_NAME="demo-kafka" TOPIC="iot-data-input" PARTITIONS=1 REPLICATION=2 NR_MESSAGES=0 MAX_TIME=1 SUBJECT="metric" USERNAME="[YOUR_ACCOUNT]" TOKEN="[YOUR_ACCESS_TOKEN]" SECURITY="SSL"
Replace the [YOUR_PROJECT_NAME]
with the project name where Aiven for Apache Kafka® is running, and the duo [YOUR_ACCOUNT]
, [YOUR_ACCESS_TOKEN]
with the account credentials necessary to attach to Apache Kafka®. If you followed these examples they would be your email and the token you generated a few steps above.
Time to build the docker image with:
docker build -t fake-data-producer-for-apache-kafka-docker .
If you change any parameter in the conf/env.conf
file (like the access token), you need to rebuild the image to take the changes into the Docker images.
Finally start the fake stock data flow with:
docker run fake-data-producer-for-apache-kafka-docker
After few seconds we should see an infinite amount of messages getting created in the Apache Kafka® topic named iot-data-input
. We can check them using kcat. Use the instructions in the dedicated doc page to install and set up the kcat.config
configuration file and then check the data flowing in Apache Kafka® with.
kcat -F kcat.config -C -t iot-data-input
We should see a stream of IoT device cpu usage measurements.
Define a data filtering pipeline
For the rest of the post, we'll need the ID of the integration we just created. To retrieve it, we can issue the following command relying on the Aiven CLI and some jq
filtering of the results
KAFKA_FLINK_SI=$( \ avn service integration-list \ demo-flink \ --project $AVN_PROJECT_NAME \ --json \ | jq -r '.[] | select(.source == "demo-kafka") | .service_integration_id')
The above command lists the integrations, filtering the one pointing to the demo-kafka
service and storing its ID in the KAFKA_FLINK_SI
variable, that we'll use later.
The next step is to setup a streaming data pipeline in Apache Flink®. Aiven for Apache Flink allows us to define data pipelines as applications, containing all the metadata definitions like table, SQL transformation and deployment parameters.
The following Aiven CLI command allows us to create an application called "Filtering_Data_Pipeline":
avn service flink create-application demo-flink \ --project $AVN_PROJECT_NAME \ "{\"name\":\"Filtering_Data_Pipeline\"}"
Define an Aiven for Apache Flink application
Now we've got a new Apache Flink application named Filtering_Data_Pipeline
, we can retrieve the application id that we'll need later on with:
FLINK_APP_ID=$(avn service flink list-applications demo-flink \ --project $AVN_PROJECT_NAME \ | jq -r '.applications[] | select(.name == "Filtering_Data_Pipeline") | .id')
The above command lists the applications, filtering the one named Filtering_Data_Pipeline
and storing its ID in the FLINK_APP_ID
variable, that we'll use later.
Define an Aiven for Apache Flink application version
Now it's time to create an application version. The versioning system allows us to quickly evolve the data pipeline definition, and to travel back in history if some changes are not in line with expectations. In the application version we need to specify:
- The list of sources, together with their service integration id that defines which other Aiven services will be used to pull the data
- The sink, together with its service integration id that defines which other Aiven service will be used to sink the data
- The transformation SQL statement that defines how to reshape the data.
The overall structure of the application in JSON format is:
{ "sources": [ { "create_table":"CREATE TABLE SRC_TBL1....", "integration_id":"INTEGRATION_ID1" }, { "create_table":"CREATE TABLE SRC_TBL2....", "integration_id":"INTEGRATION_ID1" } ], "sinks": [ { "create_table":"CREATE TABLE SINK_TBL1....", "integration_id":"INTEGRATION_ID3" } ], "statement": "INSERT INTO SINK_TBL1 SELECT ..." }
Back to our example, we can create the application version with the following command:
avn service flink create-application-version demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ """{ \"sources\": [ { \"create_table\": \"CREATE TABLE iot_in (\n\ hostname VARCHAR,\n\ cpu VARCHAR,\n\ usage DOUBLE,\n\ occurred_at BIGINT,\n\ time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),\n\ WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' seconds\n\ )\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'value.fields-include' = 'ALL',\n\ 'topic' = 'iot-data-input',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"sinks\": [ { \"create_table\": \"CREATE TABLE iot_filtered_alert (\n\ hostname VARCHAR,\n\ time_ltz TIMESTAMP(3),\n\ cpu VARCHAR,\n\ usage DOUBLE)\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'topic' = 'iot-filtered-alert',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"statement\": \"insert into iot_filtered_alert\n\ select\n\ hostname,\n\ time_ltz,\n\ cpu,\n\ usage\n\ from\n\ iot_in\n\ where usage > 90\" }"""
The above command generates an Apache Flink® application version in demo-flink
composed of:
-
A source table called
iot_in
using the integration id with the Apache Kafka® service stored in the $$KAFKA_FLINK_SI`.The schema of the table maps the fields
hostname
,cpu
,usage
andoccurred_at
. Moreover it sets a new field, calledtime_ltz
, which casts theoccurred_at
timestamp in linux format to the native timestamp used by Apache Flink®. Finally theWATERMARK
section allows late events to arrive: we are enabling the IoT measurements to be included as valid and considered for the downstream data pipelines if they arrive within10
seconds delay. We can notice that, apart from the peculiarWATERMARK
definition, the rest of the SQL definition is really similar to what we would use in a relational database.Some things to notice in the source table definition
WITH
section:- it reads from the beginning (
earliest-offset
) of the topiciot-data-input
- as the
properties.bootstrap.servers
is empty, the connection information will be fetched from the integration definition - it uses the standard
kafka
connector (check more about the different Apache Kafka® connectors in the dedicated documentation) - it defines the inbound source of data to be
json
format.
- it reads from the beginning (
-
A sink table called
iot_filtered_alert
using the integration id with the Apache Kafka® service stored in the $$KAFKA_FLINK_SI`.The schema of the table includes the fields
hostname
,time_ltz
,cpu
andusage
. It defines the target of the data pipeline as a topic namediot-filtered-alert
in Apache Kafka (with the $$KAFKA_FLINK_SI` integration id) using similar settings for the connector, startup mode and value format as the source table. -
A SQL transformation that selects the
hostname
,time_ltz
,cpu
andusage
from the sourceiot_in
table, filtering forusage
values greater than90
and inserts the data in theiot_filtered_alert
.
Deploy an Aiven for Apache Flink application version
At this point, the first version of our application is ready to be deployed. The deployment stage allows us to actually start the data pipeline by submitting the application to the Apache Flink service for it to be run. We need to fetch the application version id with:
FLINK_APP_VERSION_ID=$(avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ | jq -r '.application_versions[] | select(.version == 1) | .id')
The above command retrieves the Application definition, and with jq
filters for version 1 (select(.version == 1)
) and extracts the id
.
Now we are ready to deploy our application, with the following command:
avn service flink create-application-deployment demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ "{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$FLINK_APP_VERSION_ID\"}"
We can check the deployment status with:
avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID | jq '.current_deployment.status'
Once the application deployment is in RUNNING
state, filtered data should be pushed to the Apache Kafka® topic named iot-filtered-output
. We can check with kcat
and the following call:
kcat -F kcat.config -C -t iot-filtered-alert
Only measurements with over 90% utilization should appear in the iot-filtered-alert
topic, similar to the below:
{"hostname":"dopey","time_ltz":"2023-02-08 11:05:23.319","cpu":"cpu4","usage":90.51760978462883} {"hostname":"happy","time_ltz":"2023-02-08 11:05:25.954","cpu":"cpu4","usage":99.6863297672615} {"hostname":"sneezy","time_ltz":"2023-02-08 11:05:30.701","cpu":"cpu5","usage":97.19928378993606} {"hostname":"doc","time_ltz":"2023-02-08 11:05:32.142","cpu":"cpu2","usage":95.69989296729409} {"hostname":"doc","time_ltz":"2023-02-08 11:05:32.773","cpu":"cpu3","usage":94.04115316937872} {"hostname":"grumpy","time_ltz":"2023-02-08 11:05:34.052","cpu":"cpu2","usage":95.45458336597062} {"hostname":"bashful","time_ltz":"2023-02-08 11:05:34.506","cpu":"cpu1","usage":93.83903103097724}
Great, we built our first filtering pipeline redirecting problematic samples to a new Apache Kafka® topic where downstream applications can access them and generate the required alerts. It's streaming, so the alerts are detected and propagated in near real time.
Define an aggregation pipeline
In the second scenario we want to define an aggregation over the IoT metrics calculating the average and maximum usage over 5 minutes windows to help us smooth the bursty sample behavior. Following similar steps to the above, we can:
-
Define a new Aiven for Apache Flink application with:
avn service flink create-application demo-flink \ --project $AVN_PROJECT_NAME \ "{\"name\":\"Windowing_Data_Pipeline\"}"
-
Get the application id with:
FLINK_APP_ID=$(avn service flink list-applications demo-flink \ --project $AVN_PROJECT_NAME \ | jq -r '.applications[] | select(.name == "Windowing_Data_Pipeline") | .id')
-
Define the new application version with:
avn service flink create-application-version demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ """{ \"sources\": [ { \"create_table\": \"CREATE TABLE iot_in (\n\ hostname VARCHAR,\n\ cpu VARCHAR,\n\ usage DOUBLE,\n\ occurred_at BIGINT,\n\ time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),\n\ WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' seconds\n\ )\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'value.fields-include' = 'ALL',\n\ 'topic' = 'iot-data-input',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"sinks\": [ { \"create_table\": \"CREATE TABLE iot_avg_out (\n\ window_start TIMESTAMP(3),\n\ window_end TIMESTAMP(3),\n\ hostname VARCHAR,\n\ cpu VARCHAR,\n\ avg_usage DOUBLE,\n\ max_usage DOUBLE\n\ )\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'topic' = 'iot-avg-output',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"statement\": \"insert into iot_avg_out\n\ select\n\ window_start,\n\ window_end,\n\ hostname,\n\ cpu,\n\ avg(usage),\n\ max(usage)\n\ from\n\ TABLE(TUMBLE( TABLE iot_in, DESCRIPTOR(time_ltz), INTERVAL '5' MINUTES))\n\ group by\n\ window_start,\n\ window_end,\n\ hostname,\n\ cpu\" }"""
Note that the above command reuses the same definition for the
iot_in
table as the previous example, but defines a new Apache Flink® table definition callediot_avg_out
pointing to the topiciot-avg-output
in JSON format. Theiot_avg_out
schema SQL includes thewindow_start
andwindow_end
columns defining the boundaries of the window calculated using the event time, the IoT devicehostname
andcpu
, and the aggregated columns for average and maximum usage.The
statement
section defines the 5 minute time windows, based on the event time columntime_ltz
with theTUMBLE
option (check out the dedicated window documentation), and performs the aggregations. We can notice again that, apart from the customTUMBLE
definition, the SQL statement is really similar to one we could use in a relational database. -
Get the application version id with:
FLINK_APP_VERSION_ID=$(avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ | jq -r '.application_versions[] | select(.version == 1) | .id')
-
Deploy the application with:
avn service flink create-application-deployment demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ "{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$FLINK_APP_VERSION_ID\"}"
We can check the deployment status with:
avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID | jq '.current_deployment.status'
Once the status is RUNNING
, we can check the aggregated results being pushed to the iot-avg-output
Apache Kafka® topic using kcat
and the following command:
kcat -F kcat.config -C -t iot-avg-output
The result should be similar to the following, showing the window start and end time, the IoT device hostname, cpu and the aggregated utilization metrics
{"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"dopey","cpu":"cpu2","avg_usage":83.75267676383785,"max_usage":98.69540798403519} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"happy","cpu":"cpu2","avg_usage":86.92580172018437,"max_usage":99.19071554040988} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"happy","cpu":"cpu1","avg_usage":88.89410118795028,"max_usage":98.63471212455435} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"dopey","cpu":"cpu4","avg_usage":84.0739509212965,"max_usage":98.74529247031236} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"grumpy","cpu":"cpu4","avg_usage":79.39103768789909,"max_usage":91.3979757069514} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"sneezy","cpu":"cpu2","avg_usage":84.2944495941075,"max_usage":99.55062006322083} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"grumpy","cpu":"cpu3","avg_usage":82.57313518548608,"max_usage":99.47190111994642} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"sneezy","cpu":"cpu4","avg_usage":86.71661332259018,"max_usage":99.61266191120228}
Now that we created our 5 minutes windows topic, we could iterate again in our monitoring solution, by adding a follow up Apache Flink® job filtering only the data going over a threshold. The beauty of this approach is that we can plug one step of our solution at a time, storing the intermediate data in the common bus represented by Apache Kafka®.
Furthermore since the data is stored in an Apache Kafka® topic, we could use Apache Kafka Connect® to sink the data to OpenSearch® and use OpenSearch Dashboard or Grafana® to visualize the trends.
With the combination of Apache Flink®, Apache Kafka®, and Apache Kafka Connect® we have endless possibilities in terms of streaming analytics and integrations.
Build a streaming anomaly detection system
Use Aiven for Apache Flink® for data transformation, Aiven for Apache Kafka® for data streaming, and Aiven for PostgreSQL® for data storage/query.
Read tutorialNext steps
Apache Kafka® and Apache Flink® allow you to move away from batch processing and embrace streaming while keeping a familiar SQL interface for the pipeline definitions. Even more, Apache Flink® rich SQL syntax allows you to define aggregations, boundaries and temporal limits that would be somehow hard to define on traditional databases. To know a bit more about this open source combination and the related managed services Aiven is offering:
- Check out Aiven for Apache Kafka®
- Check out Aiven for Apache Flink®
- Understand the difference between Event time and Processing time
- Learn all the available Apache Flink® SQL functions
If you're not using Aiven services yet, go ahead and sign up now for your free trial at https://console.aiven.io/signup!