Move from batch to streaming with Apache Kafka® and Apache Flink®

Find out how Apache Kafka® and Apache Flink® allow you to move from batch processing to streaming, but keep using SQL in the data pipeline.

This post, originally published in February 2022, has been updated to the new Aiven for Apache Flink®️ API and developer experience, as released in February 2023.

If data is the new gold then data pipelines must be the mining shafts and SQL the pickaxe: allowing information to travel across the company from the sources to the desired decision-enabling dashboards. But driving the data assets to the right audience is only part of the problem; performing it in a timely manner is becoming more and more critical. The old days of watching today a dashboard containing yesterday's data are gone, we need to analyse, capture trends and detect outliers as soon as possible.

More and more companies are consequently going away from batch and embracing streaming solutions which enable near real time data pipeline definition. To make this transition easier, selecting a target technology speaking a language similar to the original is usually a sensible choice, since it enables existing skills reusability with small adaptations.

The combination of Apache Kafka® and Apache Flink®, two open source projects aiming respectively at transmitting and computing streaming data, is therefore a good choice, since it enables the transition from batch to streaming keeping the data pipeline definitions in the data practitioners most beloved language: SQL!

We previously wrote about the duo, showcasing how you could write your own streaming data pipelines using a dockerized version of Apache Flink®'s SQL client. This time we'll create new data pipelines on a different use-case and show how we can minimize the analytics latency. On top of this, there is more good news: we don't have to care anymore about running Apache Flink® ourselves, since we can rely on the managed services provided by Aiven!

You can also watch this tutorial on YouTube:

The use case

For the purpose of this blog post, we are going to mimic an inbound streaming dataset of IoT CPU utilization measurements. Since IoT devices can be geographically distributed, messages can arrive with a delay or potentially out of order, thus, before further processing, we want to allow late arrivals of IoT records with a delay up to 10 seconds. In rough terms this means that we'll wait 10 more seconds before finishing off any window calculation.

When we monitor any type of hardware, we might want to check if the device health parameters are within an optimal range, and if not, start raising alerts. We can achieve that by creating a data pipeline to filter all high cpu utilization records and push them to a new Apache Kafka® topic, where a downstream consuming application will then trigger the alerts. In the old batch days we would run the same query every few minutes to check for high values, with streaming we can redirect the problematic records as soon as they appear.

Checking every individual sample against the threshold might be a bit bursty, and we are ok if a single cpu sample goes over the limit. On the other side, we might want to calculate the average and maximum cpu level at 5 minutes interval, since those metrics can help us identifying problematic CPUs with consistent heavy load. To achieve this second monitoring step, we'll create another pipeline aggregating samples with 5 minutes windows.
Please consider, that this use case was also solvable in batch mode, but in that case we had to start the batch load 10 seconds (the allowed delay) after the 5 minutes window and see the results only after the batch time. If the batch time of collecting data, calculating and storing results is 1 minute, we would discover the end status with 1 minute and 10 seconds of delay.

With Apache Flink®, if the query allows, the window information is captured and calculated incrementally during the window time itself. Therefore, after the 10 seconds delay is finished, there can be a minimal overhead due to the last computation to finish before emitting the result. For simple queries, we would retrieve the results with just a little bit more of the 10 seconds of forced delay.

What we'll notice when writing the two data pipelines is how Apache Flink® SQL is similar to the SQL we would use to query a relational database. This makes the migration from batch processes to streaming a matter of learning the little peculiar tricks of Apache Flink.

Define the building blocks

We will use Aiven for Apache Kafka® as the data bus and Aiven for Apache Flink® to define some data pipelines in SQL.
Aiven provides a beautiful web console that we can use to create all our services, but for the aim of this blog post, we will rely on the Aiven CLI to setup and manage our instances, since it provides a way to script the entire process and produce replicable results.

To make our command lines simpler, we can store the Aiven project name we are using in a shell variable, AVN_PROJECT_NAME, replacing the YOUR_PROJECT_NAME placeholder with the actual project name. For instance, in bash:

AVN_PROJECT_NAME=YOUR_PROJECT_NAME

Once the Aiven CLI is installed, we can start creating the Aiven for Apache Kafka® service by issuing the following command in the terminal:

avn service create demo-kafka \ --project $AVN_PROJECT_NAME \ --service-type kafka \ --cloud google-europe-west3 \ --plan business-4 \ -c kafka.auto_create_topics_enable=true \ -c kafka_rest=true \ -c schema_registry=true

The above creates an Apache Kafka® instance named demo-kafka located in google-europe-west3 region, with the 3-node cluster defined by the business-4 plan. To store schema information we enable schema registry via Aiven's Karapace, and we allow querying it via REST calls by enabling Kafka REST. Finally we allow the automatic creation of topics on the first message, useful for a demo project like this.

Now, we can create the Aiven for Apache Flink® service with:

avn service create demo-flink \ --project $AVN_PROJECT_NAME \ --service-type flink \ --cloud google-europe-west3 \ --plan business-4

Compared to the previous call, the only difference is the service name (now demo-flink) and type (now flink).

To complete the setup, we need to connect the two pillars with a service integration, and again the Aiven CLI is our friend.

avn service integration-create \ --project $AVN_PROJECT_NAME \ -t flink \ -s demo-kafka \ -d demo-flink

In the above, we specify an integration of type flink connecting the data source demo-kafka to the Aiven for Apache Flink® service called demo-flink.

Define the streaming input

We need some data to play with, and instead of the usual pizza example, we will now use fake metrics reported by IoT (Internet of Things) devices. The beauty is that it's contained in the same GitHub repository that we were using for our previous example, which has a flag called subject allowing us to generate various types of fake data.

The associated GitHub repository provides a dockerised version. To start using it we first clone the repository

git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker

Next, we need to create an access token, that will be used by the Docker instance to retrieve the Apache Kafka®'s service URI and certificates, with the following Aiven CLI command and jq to fetch the results:

avn user access-token create \ --description "Token used by Fake data generator" \ --max-age-seconds 3600 \ --json | jq -r '.[].full_token'

The above generates a token valid for 1 hour (3600 secs). Take note of the command output in the FULL_TOKEN field since now it's time to include it in the repo config file. Within the fake-data-producer-for-apache-kafka-docker folder, let's copy the conf/env.conf.sample to conf/env.conf and edit the file with the following content:

PROJECT_NAME="[YOUR_PROJECT_NAME]" SERVICE_NAME="demo-kafka" TOPIC="iot-data-input" PARTITIONS=1 REPLICATION=2 NR_MESSAGES=0 MAX_TIME=1 SUBJECT="metric" USERNAME="[YOUR_ACCOUNT]" TOKEN="[YOUR_ACCESS_TOKEN]" SECURITY="SSL"

Replace the [YOUR_PROJECT_NAME] with the project name where Aiven for Apache Kafka® is running, and the duo [YOUR_ACCOUNT], [YOUR_ACCESS_TOKEN] with the account credentials necessary to attach to Apache Kafka®. If you followed these examples they would be your email and the token you generated a few steps above.

Time to build the docker image with:

docker build -t fake-data-producer-for-apache-kafka-docker .

If you change any parameter in the conf/env.conf file (like the access token), you need to rebuild the image to take the changes into the Docker images.

Finally start the fake stock data flow with:

docker run fake-data-producer-for-apache-kafka-docker

After few seconds we should see an infinite amount of messages getting created in the Apache Kafka® topic named iot-data-input. We can check them using kcat. Use the instructions in the dedicated doc page to install and set up the kcat.config configuration file and then check the data flowing in Apache Kafka® with.

kcat -F kcat.config -C -t iot-data-input

We should see a stream of IoT device cpu usage measurements.

Define a data filtering pipeline

For the rest of the post, we'll need the ID of the integration we just created. To retrieve it, we can issue the following command relying on the Aiven CLI and some jq filtering of the results

KAFKA_FLINK_SI=$( \ avn service integration-list \ demo-flink \ --project $AVN_PROJECT_NAME \ --json \ | jq -r '.[] | select(.source == "demo-kafka") | .service_integration_id')

The above command lists the integrations, filtering the one pointing to the demo-kafka service and storing its ID in the KAFKA_FLINK_SI variable, that we'll use later.

The next step is to setup a streaming data pipeline in Apache Flink®. Aiven for Apache Flink allows us to define data pipelines as applications, containing all the metadata definitions like table, SQL transformation and deployment parameters.

The following Aiven CLI command allows us to create an application called "Filtering_Data_Pipeline":

avn service flink create-application demo-flink \ --project $AVN_PROJECT_NAME \ "{\"name\":\"Filtering_Data_Pipeline\"}"

Now we've got a new Apache Flink application named Filtering_Data_Pipeline, we can retrieve the application id that we'll need later on with:

FLINK_APP_ID=$(avn service flink list-applications demo-flink \ --project $AVN_PROJECT_NAME \ | jq -r '.applications[] | select(.name == "Filtering_Data_Pipeline") | .id')

The above command lists the applications, filtering the one named Filtering_Data_Pipeline and storing its ID in the FLINK_APP_ID variable, that we'll use later.

Now it's time to create an application version. The versioning system allows us to quickly evolve the data pipeline definition, and to travel back in history if some changes are not in line with expectations. In the application version we need to specify:

  • The list of sources, together with their service integration id that defines which other Aiven services will be used to pull the data
  • The sink, together with its service integration id that defines which other Aiven service will be used to sink the data
  • The transformation SQL statement that defines how to reshape the data.

The overall structure of the application in JSON format is:

{ "sources": [ { "create_table":"CREATE TABLE SRC_TBL1....", "integration_id":"INTEGRATION_ID1" }, { "create_table":"CREATE TABLE SRC_TBL2....", "integration_id":"INTEGRATION_ID1" } ], "sinks": [ { "create_table":"CREATE TABLE SINK_TBL1....", "integration_id":"INTEGRATION_ID3" } ], "statement": "INSERT INTO SINK_TBL1 SELECT ..." }

Back to our example, we can create the application version with the following command:

avn service flink create-application-version demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ """{ \"sources\": [ { \"create_table\": \"CREATE TABLE iot_in (\n\ hostname VARCHAR,\n\ cpu VARCHAR,\n\ usage DOUBLE,\n\ occurred_at BIGINT,\n\ time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),\n\ WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' seconds\n\ )\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'value.fields-include' = 'ALL',\n\ 'topic' = 'iot-data-input',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"sinks\": [ { \"create_table\": \"CREATE TABLE iot_filtered_alert (\n\ hostname VARCHAR,\n\ time_ltz TIMESTAMP(3),\n\ cpu VARCHAR,\n\ usage DOUBLE)\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'topic' = 'iot-filtered-alert',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"statement\": \"insert into iot_filtered_alert\n\ select\n\ hostname,\n\ time_ltz,\n\ cpu,\n\ usage\n\ from\n\ iot_in\n\ where usage > 90\" }"""

The above command generates an Apache Flink® application version in demo-flink composed of:

  1. A source table called iot_in using the integration id with the Apache Kafka® service stored in the $$KAFKA_FLINK_SI`.

    The schema of the table maps the fields hostname, cpu, usage and occurred_at. Moreover it sets a new field, called time_ltz, which casts the occurred_at timestamp in linux format to the native timestamp used by Apache Flink®. Finally the WATERMARK section allows late events to arrive: we are enabling the IoT measurements to be included as valid and considered for the downstream data pipelines if they arrive within 10 seconds delay. We can notice that, apart from the peculiar WATERMARK definition, the rest of the SQL definition is really similar to what we would use in a relational database.

    Some things to notice in the source table definition WITH section:

    • it reads from the beginning (earliest-offset) of the topic iot-data-input
    • as the properties.bootstrap.servers is empty, the connection information will be fetched from the integration definition
    • it uses the standard kafka connector (check more about the different Apache Kafka® connectors in the dedicated documentation)
    • it defines the inbound source of data to be json format.
  2. A sink table called iot_filtered_alert using the integration id with the Apache Kafka® service stored in the $$KAFKA_FLINK_SI`.

    The schema of the table includes the fields hostname, time_ltz, cpu and usage. It defines the target of the data pipeline as a topic named iot-filtered-alert in Apache Kafka (with the $$KAFKA_FLINK_SI` integration id) using similar settings for the connector, startup mode and value format as the source table.

  3. A SQL transformation that selects the hostname, time_ltz, cpu and usage from the source iot_in table, filtering for usage values greater than 90 and inserts the data in the iot_filtered_alert.

At this point, the first version of our application is ready to be deployed. The deployment stage allows us to actually start the data pipeline by submitting the application to the Apache Flink service for it to be run. We need to fetch the application version id with:

FLINK_APP_VERSION_ID=$(avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ | jq -r '.application_versions[] | select(.version == 1) | .id')

The above command retrieves the Application definition, and with jq filters for version 1 (select(.version == 1)) and extracts the id.

Now we are ready to deploy our application, with the following command:

avn service flink create-application-deployment demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ "{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$FLINK_APP_VERSION_ID\"}"

We can check the deployment status with:

avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID | jq '.current_deployment.status'

Once the application deployment is in RUNNING state, filtered data should be pushed to the Apache Kafka® topic named iot-filtered-output. We can check with kcat and the following call:

kcat -F kcat.config -C -t iot-filtered-alert

Only measurements with over 90% utilization should appear in the iot-filtered-alert topic, similar to the below:

{"hostname":"dopey","time_ltz":"2023-02-08 11:05:23.319","cpu":"cpu4","usage":90.51760978462883} {"hostname":"happy","time_ltz":"2023-02-08 11:05:25.954","cpu":"cpu4","usage":99.6863297672615} {"hostname":"sneezy","time_ltz":"2023-02-08 11:05:30.701","cpu":"cpu5","usage":97.19928378993606} {"hostname":"doc","time_ltz":"2023-02-08 11:05:32.142","cpu":"cpu2","usage":95.69989296729409} {"hostname":"doc","time_ltz":"2023-02-08 11:05:32.773","cpu":"cpu3","usage":94.04115316937872} {"hostname":"grumpy","time_ltz":"2023-02-08 11:05:34.052","cpu":"cpu2","usage":95.45458336597062} {"hostname":"bashful","time_ltz":"2023-02-08 11:05:34.506","cpu":"cpu1","usage":93.83903103097724}

Great, we built our first filtering pipeline redirecting problematic samples to a new Apache Kafka® topic where downstream applications can access them and generate the required alerts. It's streaming, so the alerts are detected and propagated in near real time.

Define an aggregation pipeline

In the second scenario we want to define an aggregation over the IoT metrics calculating the average and maximum usage over 5 minutes windows to help us smooth the bursty sample behavior. Following similar steps to the above, we can:

  1. Define a new Aiven for Apache Flink application with:

    avn service flink create-application demo-flink \ --project $AVN_PROJECT_NAME \ "{\"name\":\"Windowing_Data_Pipeline\"}"
  2. Get the application id with:

    FLINK_APP_ID=$(avn service flink list-applications demo-flink \ --project $AVN_PROJECT_NAME \ | jq -r '.applications[] | select(.name == "Windowing_Data_Pipeline") | .id')
  3. Define the new application version with:

    avn service flink create-application-version demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ """{ \"sources\": [ { \"create_table\": \"CREATE TABLE iot_in (\n\ hostname VARCHAR,\n\ cpu VARCHAR,\n\ usage DOUBLE,\n\ occurred_at BIGINT,\n\ time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),\n\ WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' seconds\n\ )\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'value.fields-include' = 'ALL',\n\ 'topic' = 'iot-data-input',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"sinks\": [ { \"create_table\": \"CREATE TABLE iot_avg_out (\n\ window_start TIMESTAMP(3),\n\ window_end TIMESTAMP(3),\n\ hostname VARCHAR,\n\ cpu VARCHAR,\n\ avg_usage DOUBLE,\n\ max_usage DOUBLE\n\ )\n\ WITH (\n\ 'connector' = 'kafka',\n\ 'properties.bootstrap.servers' = '',\n\ 'scan.startup.mode' = 'earliest-offset',\n\ 'topic' = 'iot-avg-output',\n\ 'value.format' = 'json'\n\ )\", \"integration_id\": \"$KAFKA_FLINK_SI\" } ], \"statement\": \"insert into iot_avg_out\n\ select\n\ window_start,\n\ window_end,\n\ hostname,\n\ cpu,\n\ avg(usage),\n\ max(usage)\n\ from\n\ TABLE(TUMBLE( TABLE iot_in, DESCRIPTOR(time_ltz), INTERVAL '5' MINUTES))\n\ group by\n\ window_start,\n\ window_end,\n\ hostname,\n\ cpu\" }"""

    Note that the above command reuses the same definition for the iot_in table as the previous example, but defines a new Apache Flink® table definition called iot_avg_out pointing to the topic iot-avg-output in JSON format. The iot_avg_out schema SQL includes the window_start and window_end columns defining the boundaries of the window calculated using the event time, the IoT device hostname and cpu, and the aggregated columns for average and maximum usage.

    The statement section defines the 5 minute time windows, based on the event time column time_ltz with the TUMBLE option (check out the dedicated window documentation), and performs the aggregations. We can notice again that, apart from the custom TUMBLE definition, the SQL statement is really similar to one we could use in a relational database.

  4. Get the application version id with:

    FLINK_APP_VERSION_ID=$(avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ | jq -r '.application_versions[] | select(.version == 1) | .id')
  5. Deploy the application with:

    avn service flink create-application-deployment demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID \ "{\"parallelism\": 1,\"restart_enabled\": true, \"version_id\": \"$FLINK_APP_VERSION_ID\"}"

We can check the deployment status with:

avn service flink get-application demo-flink \ --project $AVN_PROJECT_NAME \ --application-id $FLINK_APP_ID | jq '.current_deployment.status'

Once the status is RUNNING, we can check the aggregated results being pushed to the iot-avg-output Apache Kafka® topic using kcat and the following command:

kcat -F kcat.config -C -t iot-avg-output

The result should be similar to the following, showing the window start and end time, the IoT device hostname, cpu and the aggregated utilization metrics

{"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"dopey","cpu":"cpu2","avg_usage":83.75267676383785,"max_usage":98.69540798403519} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"happy","cpu":"cpu2","avg_usage":86.92580172018437,"max_usage":99.19071554040988} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"happy","cpu":"cpu1","avg_usage":88.89410118795028,"max_usage":98.63471212455435} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"dopey","cpu":"cpu4","avg_usage":84.0739509212965,"max_usage":98.74529247031236} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"grumpy","cpu":"cpu4","avg_usage":79.39103768789909,"max_usage":91.3979757069514} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"sneezy","cpu":"cpu2","avg_usage":84.2944495941075,"max_usage":99.55062006322083} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"grumpy","cpu":"cpu3","avg_usage":82.57313518548608,"max_usage":99.47190111994642} {"window_start":"2023-02-08 10:20:00","window_end":"2023-02-08 10:25:00","hostname":"sneezy","cpu":"cpu4","avg_usage":86.71661332259018,"max_usage":99.61266191120228}

Now that we created our 5 minutes windows topic, we could iterate again in our monitoring solution, by adding a follow up Apache Flink® job filtering only the data going over a threshold. The beauty of this approach is that we can plug one step of our solution at a time, storing the intermediate data in the common bus represented by Apache Kafka®.
Furthermore since the data is stored in an Apache Kafka® topic, we could use Apache Kafka Connect® to sink the data to OpenSearch® and use OpenSearch Dashboard or Grafana® to visualize the trends.

With the combination of Apache Flink®, Apache Kafka®, and Apache Kafka Connect® we have endless possibilities in terms of streaming analytics and integrations.

Build a streaming anomaly detection system

Use Aiven for Apache Flink® for data transformation, Aiven for Apache Kafka® for data streaming, and Aiven for PostgreSQL® for data storage/query.

Read tutorial

Next steps

Apache Kafka® and Apache Flink® allow you to move away from batch processing and embrace streaming while keeping a familiar SQL interface for the pipeline definitions. Even more, Apache Flink® rich SQL syntax allows you to define aggregations, boundaries and temporal limits that would be somehow hard to define on traditional databases. To know a bit more about this open source combination and the related managed services Aiven is offering:

If you're not using Aiven services yet, go ahead and sign up now for your free trial at https://console.aiven.io/signup!

Further reading