Skip to content

Aiven Logo
  • Pricing
  • Blog

Log in

Book a demo

Start for free

Developer Center
  1. Aiven Developer Center
  2. Streaming

Streamline SQL Pipeline with Apache Flink® and Apache Kafka®

Apache Kafka® is the perfect base for a streaming application. Apache Flink® has the power of stateful data transformations. Together, they move data!

  • Apache Kafka®
  • Apache Flink®
  • Tutorial
Subscribe to RSS
Loading...

Subscribe to RSS

Apache Kafka® is the perfect base for any streaming application: a solid, highly-available, fault-tolerant platform that makes reliable communication between streaming components as easy as writing to a disk.

Apache Flink® adds the power of stateful data transformations to the picture. It's able to calculate, persist, recover and process data in a similar distributed, highly-available, fault-tolerant fashion to that provided by Kafka. Apache Flink is available from a variety of languages: from the more traditional Java and Scala all the way to Python and SQL.

The article Apache Flink® SQL client on Docker shows how you can create your Docker version of Apache Flink including its SQL Client. In this article, we will demonstrate how you can use the best streaming combination — Apache Flink and Kafka — to create pipelines defined using data practitioners' favourite language: SQL!

Set up Apache Kafka

Apache Kafka is our basic data storage platform. We can create a cluster via Aiven's Command line interface in our terminal:

Loading code...

This sets up an Apache Kafka cluster named demo-kafka in google-europe-west3, enabling Kafka REST APIs and topic auto creation. If you want to wait until the demo-kafka service is ready to use, you can use the following command:

Loading code...

Set up Apache Flink on Docker

In Apache Flink® SQL client on Docker I outlined how to properly set up Apache Flink on Docker. With Docker, we can have a working environment running in minutes without needing to fiddle with installation and configuration. The previous post provides instructions on how to set up such an environment and how to create a file-to-PostgreSQL data pipeline. In this article, I'm going to assume that you correctly configured Apache Flink, and that the service is up and running.

Here's a quick summary of the required steps if you didn't follow the previous post. Clone the Aiven-Open/sql-cli-for-apache-flink-docker repository with the following code in your terminal

Loading code...

Now move into the flink-sql-cli-docker folder

Loading code...

and start docker:

Loading code...

When that's finished, you can run

Loading code...

and you should get output like this:

Loading code...

This tells you that Flink's job manager, task manager and sql-client containers are all ready to be used.

::alert Docker is great for testing or development, but for production workloads you might want to use more reliable managed services like Aiven for Apache Kafka®️ and Aiven for Apache Flink®️

Create a Keystore for Kafka's SSL certificates

Aiven for Apache Kafka enables SSL authentication by default. To safely connect to it from Apache Flink, we need to use the Java Keystore and Truststore. We can generate them with the following command in our terminal, assuming we are in the sql-cli-for-apache-flink-docker folder you created in the previous steps:

Loading code...

The command creates a folder named certs under settings and stores the certificate files together with a Keystore and Truststore (named client.keystore.p12 and client.truststore.jks), secured with the password123 password string.

Create some test data with kcat

Now we can use kcat to create some data. After installing it, use the following call to find the <host> and <port> parameters:

Loading code...

and then create a file kcat.config under our settings/certs folder with the following content, replacing <host> and <port> with the values we just found:

Loading code...

Now open a new terminal and navigate to the settings/certs folder.

If you're on a Mac and running kcat 1.7.0 or earlier, then see the note below. Otherwise, you can run:

Loading code...

kcat will read messages from the standard input, and send each line as a message to Kafka as soon as it sees a newline. The -t people tells it to send to the people topic.

Paste the following lines into the terminal:

Loading code...

Four messages have been sent to people topic in our Apache Kafka environment. Keep this terminal window open - you'll use it again later to insert more messages.

Note On a Mac, with kcat 1.7.0 or earlier, messages are buffered up, so will not be sent immediately. So instead of using the interactive input suggested above, do
Loading code...
If you're on a Mac and using kcat 1.7.0 or earlier and using the fish shell, that won't work either, so either temporarily change to zsh or write the JSON to a file and use -l <filename> to read the lines from the file

Define the source Kafka topic as a Flink Table

As mentioned in the previous post, we can enter Flink's sql-client container to create a SQL pipeline by executing the following command in a new terminal window:

Loading code...

Now we're in, and we can start Flink's SQL client with

Loading code...

Define a source for the people Kafka topic with the following code (replace the <host> and <port> parameters to correctly point to Kafka as we did for the kcat configuration):

Loading code...

The command above defines a Flink table named people_source with the following properties:

  • Three columns: name, country and age
  • Connecting to Apache Kafka (connector = 'kafka')
  • Reading from the start (scan.startup.mode) of the topic people (topic) which format is JSON (value.format) with consumer being part of the my-working-group consumer group.
  • Connecting via the bootstrap.servers and using the SSL security protocol (properties.security.protocol) with the client.truststore.jks and client.keystore.p12 stores.

After executing it, we should see [INFO] Execute statement succeed.

Please note that this doesn't mean it's working! We can test it properly by actually looking to see what is in the table.

First, run the following command at the sql-client terminal (we'll come back to explain it later on):

Loading code...

Follow that with the command:

Loading code...

Which will result in output like the following

Loading code...

To leave Flink's table view, press Q.

If you get an AccessDeniedException on Linux

If you're on Linux, you'll probably hit an error like this:

Loading code...

This error is caused by a couple of factors:

  • The generated client.keystore.p12 file is by default only readable by the user who created it (-rw -- --)
  • The way docker-compose mounts the volumes: the folder where keystore files reside is owned by user root (uid 1000)

The combination of the two make the file client.keystore.p12 inaccessible by Flink (executed by user flink with uid 9999). To solve the problem, make the keystore readable by the flink user by redefining the folder ownership:

In the flink-sql-cli-docker folder on your host, run the following command to find its id:

Loading code...

The result should be similar to this:

Loading code...

Now we can use flink's uid to set the ownership of the settings folder and all its contents, by executing the following command in the same terminal window (replacing the 9999 with flink's uid from the above call if necessary)

Loading code...

After executing it, retry the SELECT * FROM people_source; statement. It should now succeed.

Transform and insert data

Now it's time to see the beauty of Flink in action: we're going to set up a process that analyses streaming data coming from the people Kafka topic, calculates some aggregated KPIs and publishes them to a target datastore, in our case a new Kafka topic. And we're doing everything using only SQL statements.

Flink is so flexible that you can run a similar exercise with a huge variety of technologies as sources or targets. The Kafka examples shown in this blog could be replaced with any JDBC database, local files, OpenSearch or Hive with only a few changes in our SQL definitions. The list of supported connectors can be found on Flink's website.

Define the target Kafka topic as a Flink table

For the purposes of this article, let's assume we want to push aggregated data to a new Kafka topic containing the average age and number of people in a specific country. To do so, we first need to define Flink's target table structure with the following code in Flink's sql-cli terminal window (replacing, as before, the <host>:<port> section with Kafka's endpoint):

Loading code...

The above SQL creates a Flink table with three columns: country primary key, avg-age, and nr_people. The connector is upsert-kafka since we want to update the topic always with the most updated version of the KPIs per country (PRIMARY KEY (country)). The WITH clause specifies that we will push data to the country_agg Kafka topic using the same connection properties as the people_source connector.

What's even cooler about this is that with a few small amendments to the WITH statement above, we could publish the result of our data pipeline to a completely different technology endpoint. An example of Flink's table definition of a database is provided in the article Apache Flink SQL client on Docker.

Set up the data pipeline

Once the country_target destination endpoint is defined, we can finally create the SQL pipeline by defining the query aggregation logic and related insert statement. The following code provides exactly what we need, so we can paste it in Flink's sql-cli terminal window:

Loading code...

That may give a warning message like the following, which can be safely ignored (it's been reported since at least 2019, and may or may not get fixed in the future):

Loading code...

We should then receive a message telling us that our SQL pipeline was successfully deployed, like this:

Loading code...

Now if we query the country_target table from Flink's SQL client with:

Loading code...

We should see something like this:

Loading code...

This tells us that we have one entry for USA and England as expected, but three entries for Italy - which is weird, since we only pushed two records for that country. This is because we earlier selected the Flink sql-client's changelog result mode, so the above result tells us that for Italy we had, in order:

  1. Insert entry for Italy with 1 people of average age of 25 -> +I (insert)
  2. Delete entry #1 -> -U (update/delete)
  3. Insert entry for Italy with 2 people of average age of 35 -> +U (update/insert)

Flink's changelog view is great if we want to check how KPIs have been calculated over time. On the other hand, if we just want to browse the up-to-date situation we can move to Flink's table result mode by executing the following in Flink's sql-cli terminal:

Loading code...

And now, when you say

Loading code...

You should just see the current situation:

Loading code...

Check the pipeline output

Now we want to verify that the Flink records have been successfully written to the desired Kafka topic. From a new terminal window, go to the sql-cli-for-apache-flink-docke/settings/certs directory and run this:

Loading code...

The command will start Kafkacat in consumer mode (-C) listening on topic country_agg (the same that we used in Flink's table definition). The output will be the list of updated records on the various KPIs:

Loading code...

If we now go back to the terminal window where we were running kcat as a Producer, we can add a row to our people topic:

Loading code...
If you're on a Mac running kcat version 1.7.0 or earlier, you'll need to use the same workaround you used to run the original kcat producer command, back in Create some test data with kcat, but with the message {"name":"Mark","country":"England","age":37}

We can immediately see the streaming pipeline in action with a new line appearing in the country_agg Kafka topic on the kcat consumer terminal, containing the updated avg_age and nr_people KPIs:

Loading code...

Wow, we just built a whole analytics pipeline!

We started by inserting JSON records into a Kafka topic with kcat representing our streaming input. The topic was then registered in Flink which we later configured to transform and aggregate the data. The output was finally stored in a new Kafka topic.

The whole pipeline was built with just three SQL statements and, with minor changes, we could quickly swap the data source or target using Flink as an "abstraction layer" on top of our data technology. This was a very simple use case, but Flink can be a game changer in a huge variety of situations. Your batch ETL now seems a bit dated, doesn't it?

Summary: We built a streaming pipeline in pure SQL

SQL is the best known and most loved language among data practitioners. The union of Apache Kafka and Apache Flink provides a simple, highly available and scalable toolset that allows them to focus on building real time data pipelines rather than learning and debugging complex code. Flink SQL capabilities enhance all the benefits of building Kafka-based data hubs, with the capability of joining in external data assets and delivering data pipeline output to a huge variety of targets.

Additional resources that you might find interesting:

  • An introduction to Flink on the Aiven blog
  • An introduction to event-driven architecture
  • Apache Flink SQL client on Docker, also on the Aiven blog
  • Flink SQL Client Documentation - to understand Flink SQL client functionality
  • Flink - Apache Kafka SQL Connector - to check Apache Kafka table definition
  • Flink - Apache Kafka Upsert SQL Connector - to review Apache Kafka Upsert parameters
  • Aiven Console - to create and manage your Apache Kafka cluster

Next steps

Your next step could be to check out Aiven for PostgreSQL and Aiven for Apache Flink.

If you're not using Aiven services yet, go ahead and sign up now for your free trial at https://console.aiven.io/signup!

In the meantime, make sure you follow our changelog and RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.

Table of contents

  • Set up Apache Kafka
  • Set up Apache Flink on Docker
  • Create a Keystore for Kafka's SSL certificates
  • Create some test data with kcat
  • Define the source Kafka topic as a Flink Table
  • If you get an AccessDeniedException on Linux
  • Transform and insert data
  • Define the target Kafka topic as a Flink table
  • Set up the data pipeline
  • Check the pipeline output
  • Summary: We built a streaming pipeline in pure SQL
  • Next steps
Aiven Logo at footer
Loading...
  • Github
  • Facebook
  • LinkedIn
  • Twitter
  • Youtube

Company

  • About
  • Open source
  • Careers
  • Sustainability
  • Modern slavery statement
  • Press
  • Blog

Legal

  • Terms
  • SLA
  • AUP
  • Data processing
  • Privacy
  • DSA contact
  • Cookie policy
  • Website terms of use
  • Do not sell or share my personal information

Platform

  • Responsibility matrix
  • Subprocessors
  • Security and compliance
  • Resource library
  • Support services
  • Changelog
  • Aiven status

Contact

  • Contact us
  • Book a demo
  • Support
  • Invoice address
  • Events calendar

Copyright © Aiven 2016-2025. Apache, Apache Kafka, Kafka, Apache Flink, and Flink are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. ClickHouse is a registered trademark of ClickHouse, Inc. https://clickhouse.com. OpenSearch, PostgreSQL, MySQL, Grafana, Dragonfly, Valkey, Thanos, Terraform, and Kubernetes are trademarks and property of their respective owners. All product and service names used in this website are for identification purposes only and do not imply endorsement.

avn service create -p business-4 \ -t kafka demo-kafka \ --cloud google-europe-west3 \ -c kafka_rest=true \ -c kafka.auto_create_topics_enable=true
avn service wait demo-kafka
git clone https://github.com/Aiven-Open/sql-cli-for-apache-flink-docker.git
cd sql-cli-for-apache-flink-docker
docker-compose up -d
docker-compose ps
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS sql-cli-for-apache-flink-docker-jobmanager-1 flink:1.17.1-scala_2.12-java11 "/docker-entrypoint.sh jobmanager" jobmanager 3 minutes ago Up 3 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp sql-cli-for-apache-flink-docker-taskmanager-1 flink:1.17.1-scala_2.12-java11 "/docker-entrypoint.sh taskmanager" taskmanager 3 minutes ago Up 3 minutes 6123/tcp, 8081/tcp sql-client sql-cli-for-apache-flink-docker-sql-client "/docker-entrypoint.sh" sql-client 3 minutes ago Up 3 minutes 6123/tcp, 8081/tcp
avn service user-kafka-java-creds demo-kafka \ --username avnadmin \ -d settings/certs \ -p password123
avn service get demo-kafka --format '{service_uri_params}'
bootstrap.servers=<host>:<port> security.protocol=ssl ssl.key.location=service.key ssl.certificate.location=service.cert ssl.ca.location=ca.pem
kcat -F kcat.config -P -t people
{"name":"Jon","country":"USA","age":40} {"name":"Ava","country":"England","age":35} {"name":"Pino","country":"Italy","age":25} {"name":"Carla","country":"Italy","age":45}
kcat -F kcat.config -P -t people <<EOF {"name":"Jon","country":"USA","age":40} {"name":"Ava","country":"England","age":35} {"name":"Pino","country":"Italy","age":25} {"name":"Carla","country":"Italy","age":45} EOF
docker exec -it sql-client /bin/bash
./sql-client.sh
CREATE TABLE people_source ( name VARCHAR, country VARCHAR, age INT ) WITH ( 'connector' = 'kafka', 'property-version' = 'universal', 'properties.bootstrap.servers' = '<host>:<port>', 'topic' = 'people', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'json', 'properties.security.protocol' = 'SSL', 'properties.ssl.endpoint.identification.algorithm' = '', 'properties.ssl.truststore.location' = '/settings/certs/client.truststore.jks', 'properties.ssl.truststore.password' = 'password123', 'properties.ssl.keystore.type' = 'PKCS12', 'properties.ssl.keystore.location' = '/settings/certs/client.keystore.p12', 'properties.ssl.keystore.password' = 'password123', 'properties.ssl.key.password' = 'password123', 'properties.group.id' = 'my-working-group' );
SET sql-client.execution.result-mode=changelog;
SELECT * FROM people_source;
op name country age -- ---------------------- ---------------------- ----------- +I Jon USA 40 +I Ava England 35 +I Pino Italy 25 +I Carla Italy 45
[ERROR] Could not execute SQL statement. Reason: java.nio.file.AccessDeniedException: /settings/certs/client.keystore.p12
docker exec sql-cli-for-apache-flink-docker_taskmanager_1 id flink
uid=9999(flink) gid=9999(flink) groups=9999(flink)
sudo chown -R 9999 ./settings
CREATE TABLE country_target ( country VARCHAR, avg_age BIGINT, nr_people BIGINT, PRIMARY KEY (country) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'property-version' = 'universal', 'properties.bootstrap.servers' = '<host>:<port>', 'topic' = 'country_agg', 'value.format' = 'json', 'key.format' = 'json', 'properties.security.protocol' = 'SSL', 'properties.ssl.endpoint.identification.algorithm' = '', 'properties.ssl.truststore.location' = '/settings/certs/client.truststore.jks', 'properties.ssl.truststore.password' = 'password123', 'properties.ssl.keystore.type' = 'PKCS12', 'properties.ssl.keystore.location' = '/settings/certs/client.keystore.p12', 'properties.ssl.keystore.password' = 'password123', 'properties.ssl.key.password' = 'password123', 'properties.group.id' = 'my-working-group' );
INSERT INTO country_target SELECT country, avg(age), count(*) FROM people_source GROUP BY country;
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.17.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release
[INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: 95b57225d702ab9c9402daba10fe6a84
SELECT * FROM country_target;
op country avg_age nr_people -- ---------------------- ---------------- ------------ +I USA 40 1 +I England 35 1 +I Italy 25 1 -U Italy 25 1 +U Italy 35 2
SET sql-client.execution.result-mode=table;
SELECT * FROM country_target;
country avg_age nr_people ------- ------------------------- ------------------------- USA 40 1 England 35 1 Italy 35 2
kcat -F kcat.config -C -t country_agg
{"country":"USA","avg_age":40,"nr_people":1} {"country":"England","avg_age":35,"nr_people":1} {"country":"Italy","avg_age":25,"nr_people":1} {"country":"Italy","avg_age":35,"nr_people":2} % Reached end of topic country_agg [0] at offset 4
{"name":"Mark","country":"England","age":37}
{"country":"England","avg_age":36,"nr_people":2} % Reached end of topic country_agg [0] at offset 5