Apache Flink® SQL client on Docker
Pure SQL pipelines using Apache Flink®? Learn how to set them up with a local Docker-based platform for Flink, including an SQL client!
Apache Flink® is an open source framework for data processing in both stream and batch mode. It supports a variety of different data platforms, including Apache Kafka® and any JDBC database. Flink's superpowers come in a variety of languages: from the more traditional Java and Scala, all the way to Python. And for Python there is pyFlink that you can use with the best Machine Learning libraries available.
However, in the data world, one of the most commonly-known languages is SQL. From data analysts, to scientists and engineers, SQL is usually part of the toolkit of data practitioners. SQL provides an abstraction to data access and manipulation that goes beyond technologies and trends. You can use it to query your data no matter what the underlying tech is (with minor dialect differences).
With Apache Flink, you can define whole data pipelines in pure SQL using its SQL Client. This blog post will get you set up with a local Docker-based platform for Apache Flink, including an SQL client.
Set up Apache Flink locally with Docker Compose
As mentioned above, Apache Flink is a very interesting technology and worth trying out. When evaluating a new tool, running it locally is a two-edged sword. On the one hand, you get the benefit of understanding its inner mechanism better. On the other, you also get the pain of the setup phase.
To skip the setup pain, try Docker. It offers tools as pre-packaged solutions portable to any guest OS with minimal effort.
When looking around for content around Flink's SQL client I found a demo on Apache Flink's website, which is quite good but not for casual experimentation. That's why I've created a lighter tutorial. This article, with its related Github repository, takes the minimal building blocks and provides a basic Flink functionality, expecting any data pipeline source or target to be available outside the containers. Before starting, make sure that both Docker and Docker Compose are installed.
The whole code is contained in the Aiven-Open/sql-cli-for-apache-flink-docker
repository, which we can clone with the following call in our terminal:
git clone https://github.com/Aiven-Open/sql-cli-for-apache-flink-docker.git
Now let's open the sql-cli-for-apache-flink-docker
folder and start the docker compose:
cd sql-cli-for-apache-flink-docker docker-compose up -d
This will start 3 Apache Flink nodes in the background: a jobmanager
, a taskmanager
and the sql-client
. We can review the details of the cluster like this:
docker-compose ps
This should show the three containers being in Up
state:
Name Command State Ports ------------------------------------------------------------------------------------------------------------------------- sql-cli-for-apache-flink-docker_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp sql-cli-for-apache-flink-docker_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp sql-client /docker-entrypoint.sh Up 6123/tcp, 8081/tcp
Flink's web UI is now available at localhost:8081
. This is a useful tool for browsing information about Flink's status and the data pipelines we're going to create.
Notes about setting up Docker Compose
In docker-compose.yml
we map the settings
subfolder to the jobmanager
Docker container /settings
folder. This way, settings files can be passed between host and guest, which is helpful if specific host generated files (e.g. Keystores) are needed for authentication.
The data
subfolder is also mapped to the taskmanager
and jobmanager
containers. This is purely because I needed to provide the SQL example below, but could be useful in cases where we want to test Apache Flink behaviour against files in a local file system. The data
subfolder contains a test.csv
file with made-up data that we'll use to test Apache Flink.
Unleashing the power of SQL
If we want to play with Flink's SQL, we need to enter the sql-client
container. We can do that by executing the following command in the terminal:
docker exec -it sql-client /bin/bash
Now we're in, we can start the Flink's SQL client with
./sql-client.sh
There we are! We have a fully functional SQL client that we can use to create data pipelines attaching to a variety of data sources and targets. As a little demo example we can query the test.csv
file within the flink-sql-cli-docker/data
folder by defining the associated Flink table within the SQL Client:
create table people_job ( id INT, name STRING, job STRING, salary BIGINT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///data/test.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true' );
And we can query the table:
select * from people_job;
With the following results:
+/- id name job salary + 1 Ugo Football Player 200000 + 2 Carlo Crocodile domesticator 30000 + 3 Maria Software Engineer 210000 + 4 Sandro UX Designer 70000 + 5 Melissa Software Engineer 95000
Exit Flink's table view by pressing Q
.
Aiven for Apache Flink®
A fully managed service for Apache Flink for all your real time ETL and streaming analytics use cases.
Start your free trialCreate a Data Target: PostgreSQL®
Let's assume that we want to push the aggregated average salary and people count by job to a PostgreSQL® table. If you don't have a PostgreSQL instance, you can quickly create it with the following Aiven's CLI command in a new terminal window:
avn service create pg-flink \ -t pg \ --cloud google-europe-west3 \ --plan startup-4
This creates a PostgreSQL instance (-t pg
), in google-europe-west3
with a startup-4
plan. Let's wait for the service to be ready:
avn service wait pg-flink
Let's create a target table job_details
that we'll use to push data from Flink. From the same terminal window we can execute the following:
avn service cli pg-flink
and then
create table job_summary ( job VARCHAR PRIMARY KEY, avg_salary BIGINT, nr_people BIGINT );
Create the SQL pipeline
Now it's time to retrieve the connection parameters to PostgreSQL with the following command in a new terminal window:
avn service get pg-flink \ --format '{service_uri_params}'
The output should be similar to the following:
{ 'dbname': 'defaultdb', 'host': '<hostname>.aivencloud.com', 'password': '<password>', 'port': '13039', 'sslmode': 'require', 'user': 'avnadmin' }
Take note of the host
, port
, user
, dbname
and password
details above and use them to create a Flink table pointing to PostgreSQL. Paste the following SQL into Flink's SQL Cli, replacing <host>
and so on with the values you just made note of.
create table job_summary_flink( job STRING, avg_salary BIGINT, nr_people BIGINT, PRIMARY KEY (job) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://<host>:<port>/<dbname>?sslmode=require', 'table-name' = 'job_summary', 'username' = '<username>', 'password' = '<password>' );
Now let's create the SQL pipeline with the following command in Flink SQL Client:
insert into job_summary_flink select job, avg(salary), count(*) from people_job group by job;
We should see an output from the SQL Client like this:
[INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: b2d8b019c6c6e3dc5fe63902a14c13a9
And we can now check that Flink's table has been correctly populated with the following SQL from Flink's CLI:
select * from job_summary_flink;
which results in
+/- job avg_salary nr_people + UX Designer 70000 1 + Crocodile domesticator 30000 1 + Football Player 200000 1 + Software Engineer 152500 2
And also checking we can obtain the same result from PostgreSQL with the following command in the PostgreSQL client terminal window:
select * from job_summary;
Resulting
job | avg_salary | nr_people ------------------------+------------+----------- UX Designer | 70000 | 1 Crocodile domesticator | 30000 | 1 Football Player | 200000 | 1 Software Engineer | 152500 | 2 (4 rows)
Wrapping up
This article provides a way to get Apache Flink's SQL Client as a set of Docker containers. It expects additional data sources or targets to be available outside the container and can be used to start the learning journey into Flink. The sql pipeline example shows an integration between a local csv file and PostgreSQL.
Some additional resources:
- Aiven for Apache Flink page
- Apache Flink Home page
- Apache Flink SQL Client
- Apache Flink SQL Client Demo
- Apache Flink Docker GitHub Repository
Not using Aiven services yet? Sign up now for your free trial at https://console.aiven.io/signup!
In the meantime, make sure you follow our changelog and RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.