Apache Flink® SQL client on Docker

Pure SQL pipelines using Apache Flink®? Learn how to set them up with a local Docker-based platform for Flink, including an SQL client!

Apache Flink® is an open source framework for data processing in both stream and batch mode. It supports a variety of different data platforms, including Apache Kafka® and any JDBC database. Flink's superpowers come in a variety of languages: from the more traditional Java and Scala, all the way to Python. And for Python there is pyFlink that you can use with the best Machine Learning libraries available.

However, in the data world, one of the most commonly-known languages is SQL. From data analysts, to scientists and engineers, SQL is usually part of the toolkit of data practitioners. SQL provides an abstraction to data access and manipulation that goes beyond technologies and trends. You can use it to query your data no matter what the underlying tech is (with minor dialect differences).

With Apache Flink, you can define whole data pipelines in pure SQL using its SQL Client. This blog post will get you set up with a local Docker-based platform for Apache Flink, including an SQL client.

As mentioned above, Apache Flink is a very interesting technology and worth trying out. When evaluating a new tool, running it locally is a two-edged sword. On the one hand, you get the benefit of understanding its inner mechanism better. On the other, you also get the pain of the setup phase.

To skip the setup pain, try Docker. It offers tools as pre-packaged solutions portable to any guest OS with minimal effort.

When looking around for content around Flink's SQL client I found a demo on Apache Flink's website, which is quite good but not for casual experimentation. That's why I've created a lighter tutorial. This article, with its related Github repository, takes the minimal building blocks and provides a basic Flink functionality, expecting any data pipeline source or target to be available outside the containers. Before starting, make sure that both Docker and Docker Compose are installed.

The whole code is contained in the Aiven-Open/sql-cli-for-apache-flink-docker repository, which we can clone with the following call in our terminal:

git clone https://github.com/Aiven-Open/sql-cli-for-apache-flink-docker.git

Now let's open the sql-cli-for-apache-flink-docker folder and start the docker compose:

cd sql-cli-for-apache-flink-docker docker-compose up -d

This will start 3 Apache Flink nodes in the background: a jobmanager, a taskmanager and the sql-client. We can review the details of the cluster like this:

docker-compose ps

This should show the three containers being in Up state:

Name Command State Ports ------------------------------------------------------------------------------------------------------------------------- sql-cli-for-apache-flink-docker_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp sql-cli-for-apache-flink-docker_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp sql-client /docker-entrypoint.sh Up 6123/tcp, 8081/tcp

Flink's web UI is now available at localhost:8081. This is a useful tool for browsing information about Flink's status and the data pipelines we're going to create.
flink-web-ui

Notes about setting up Docker Compose

In docker-compose.yml we map the settings subfolder to the jobmanager Docker container /settings folder. This way, settings files can be passed between host and guest, which is helpful if specific host generated files (e.g. Keystores) are needed for authentication.

The data subfolder is also mapped to the taskmanager and jobmanager containers. This is purely because I needed to provide the SQL example below, but could be useful in cases where we want to test Apache Flink behaviour against files in a local file system. The data subfolder contains a test.csv file with made-up data that we'll use to test Apache Flink.

Unleashing the power of SQL

If we want to play with Flink's SQL, we need to enter the sql-client container. We can do that by executing the following command in the terminal:

docker exec -it sql-client /bin/bash

Now we're in, we can start the Flink's SQL client with

./sql-client.sh

There we are! We have a fully functional SQL client that we can use to create data pipelines attaching to a variety of data sources and targets. As a little demo example we can query the test.csv file within the flink-sql-cli-docker/data folder by defining the associated Flink table within the SQL Client:

create table people_job ( id INT, name STRING, job STRING, salary BIGINT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///data/test.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' );

And we can query the table:

select * from people_job;

With the following results:

+/- id name job salary + 1 Ugo Football Player 200000 + 2 Carlo Crocodile domesticator 30000 + 3 Maria Software Engineer 210000 + 4 Sandro UX Designer 70000 + 5 Melissa Software Engineer 95000

Exit Flink's table view by pressing Q.

Aiven for Apache Flink®

A fully managed service for Apache Flink for all your real time ETL and streaming analytics use cases.

Start your free trial

Create a Data Target: PostgreSQL®

Let's assume that we want to push the aggregated average salary and people count by job to a PostgreSQL® table. If you don't have a PostgreSQL instance, you can quickly create it with the following Aiven's CLI command in a new terminal window:

avn service create pg-flink \ -t pg \ --cloud google-europe-west3 \ --plan startup-4

This creates a PostgreSQL instance (-t pg), in google-europe-west3 with a startup-4 plan. Let's wait for the service to be ready:

avn service wait pg-flink

Let's create a target table job_details that we'll use to push data from Flink. From the same terminal window we can execute the following:

avn service cli pg-flink

and then

create table job_summary ( job VARCHAR PRIMARY KEY, avg_salary BIGINT, nr_people BIGINT );

Create the SQL pipeline

Now it's time to retrieve the connection parameters to PostgreSQL with the following command in a new terminal window:

avn service get pg-flink \ --format '{service_uri_params}'

The output should be similar to the following:

{ 'dbname': 'defaultdb', 'host': '<hostname>.aivencloud.com', 'password': '<password>', 'port': '13039', 'sslmode': 'require', 'user': 'avnadmin' }

Take note of the host, port, user, dbname and password details above and use them to create a Flink table pointing to PostgreSQL. Paste the following SQL into Flink's SQL Cli, replacing <host> and so on with the values you just made note of.

create table job_summary_flink( job STRING, avg_salary BIGINT, nr_people BIGINT, PRIMARY KEY (job) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://<host>:<port>/<dbname>?sslmode=require', 'table-name' = 'job_summary', 'username' = '<username>', 'password' = '<password>' );

Now let's create the SQL pipeline with the following command in Flink SQL Client:

insert into job_summary_flink select job, avg(salary), count(*) from people_job group by job;

We should see an output from the SQL Client like this:

[INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: b2d8b019c6c6e3dc5fe63902a14c13a9

And we can now check that Flink's table has been correctly populated with the following SQL from Flink's CLI:

select * from job_summary_flink;

which results in

+/- job avg_salary nr_people + UX Designer 70000 1 + Crocodile domesticator 30000 1 + Football Player 200000 1 + Software Engineer 152500 2

And also checking we can obtain the same result from PostgreSQL with the following command in the PostgreSQL client terminal window:

select * from job_summary;

Resulting

job | avg_salary | nr_people ------------------------+------------+----------- UX Designer | 70000 | 1 Crocodile domesticator | 30000 | 1 Football Player | 200000 | 1 Software Engineer | 152500 | 2 (4 rows)

Wrapping up

This article provides a way to get Apache Flink's SQL Client as a set of Docker containers. It expects additional data sources or targets to be available outside the container and can be used to start the learning journey into Flink. The sql pipeline example shows an integration between a local csv file and PostgreSQL.

Some additional resources:

Not using Aiven services yet? Sign up now for your free trial at https://console.aiven.io/signup!

In the meantime, make sure you follow our changelog and RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.