Aiven Blog

Mar 3, 2022

Looking ahead to the new JSON SQL functions in Apache Flink® 1.15.0

Apache Flink® 1.15.0 will introduce new JSON SQL functions, allowing even more powerful data processing. Read on to find out what is in store.

sergey-nuyanzin

Sergey Nuyanzin

|RSS Feed

Senior Software Engineer, OSPO

The Apache Flink® SQL APIs are becoming very popular and nowadays represent the main entry point to build streaming data pipelines. The Apache Flink® community is also increasingly contributing to them with new options, functionalities and connectors being added in every release.

An example of the increasing interest in Flink SQL is the JSON support in Table SQL. JSON is one of the most used formats in the data world, with basic Apache Flink JSON functions being available in 1.14, and new capabilities added in every release. In this blog post we are going to look at what is going to be available in Apache Flink 1.15.0-SNAPSHOT which is not officially released yet. Therefore, if you want to be able to touch and play with it right now you'll need to clone Apache Flink's source code and build it yourself or wait for the official 1.15.0 release. The details on how to build Apache Flink® you can find at Building Flink from Source.

The use case

For the purpose of this blog post, we are going to mimic an inbound dataset of IoT sensors. These sensors are suppliers of measured data within the area they are located. From one side the message is in JSON format with possible nested JSON fields, from the other side, since devices could be defective, there could be an invalid JSON.

When we monitor any area we might want to do analysis, for instance for each timestamp and metric find a group of areas with corresponding maximum measurement value and show it as a new JSON message.

As we will see, Apache Flink® allows parsing and building nested JSON using SQL only statements and to reach our goal SQL here would be enough.

Explore the dataset

Once Apache Flink® 1.15.0 is ready to use, we can focus on the dataset; for instance, we could create a streaming dataset in an Apache Kafka® topic and connect Apache Flink® to it as explained in a previous blog post. However, to demonstrate the full power of the Apache Flink® JSON functions, we need a nested JSON dataset.

As we mentioned earlier we want to mimic IoT devices emitting sensor reading messages from various places around the world. We can store the following messages (one per line) in a file named iot-readings.json.

{"id":1,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":23,"dimension":"℃ "},"location": "Berlin", "timestamp":"2018-12-10 13:45:00.000"}} {"id":2,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":16,"dimension":"℃ "},"location": "Berlin", "timestamp":"2018-12-10 13:55:00.000"}} {"id":3,"name": "comment2","payload": "Out of Order"} {"id":4,"name": "Light Sensor","payload": {"data": {"metric": "Light", "value":23, "dimension": "Ev"}, "location": "Berlin", "timestamp": "2018-12-10 13:45:00.000"}} {"id":5,"name": "Noise Sensor", "payload": {"data": {"metric": "Noise", "value":43, "dimension": ""}, "timestamp": "2018-12-10 13:45:00.000"}} {"id":6,"name": "Temperature Sensor", "payload": {: {"metric":"Temperature","value":23,"dimension":"℃ "},"location": "Paris"}, "timestamp":"2018-12-10 13:45:00.000"} {"id":7,"name": "Light Sensor","payload": {"wrong_data": {"metric": "Light", "value":23, "dimension": "Ev"}, "location": "Paris", "timestamp":"2018-12-10 13:45:00.000"}} {"id":8,"name": "Noise Sensor","payload": {}} {"id":9,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":23,"dimension":"℃ "},"location": "Paris", "timestamp":"2018-12-10 13:45:00.000"}} {"id":10,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":12,"dimension":"℃ "},"location": "Paris", "timestamp":"2018-12-10 13:55:00.000"}} {"id":11,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":23,"dimension":"℃ "},"location": "London", "timestamp":"2018-12-10 13:45:00.000"}} {"id":12,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":23,"dimension":"℃ "},"location": "Rome", "timestamp":"2018-12-10 13:45:00.000"}} {"id":13,"name": "Temperature Sensor","payload": {"data": {"metric":"Temperature","value":28,"dimension":"℃ "},"location": "Rome", "timestamp":"2018-12-10 13:55:00.000"}}

Almost all the above messages have a JSON format and the payload attribute contains nested JSON. However, we can also expect some broken IoT devices in the field producing empty or wrong messages like {"id":3,"name": "comment2","payload": "Out of Order"}. We want to build reliable data pipelines, therefore we expect that receiving such incorrect messages from several defective devices is not going to stop the data flow.

One of the ways to provide such messages to Apache Kafka is by extending the Python fake data producer for Apache Kafka, or we could use tools like kcat.

To be able to process the data in the Apache Kafka topic, we need to define an Apache Flink table on top. With Flink SQL client, we can define the table with the following SQL statement:

CREATE TABLE sensors ( id STRING, name STRING, payload STRING // Here we declare payload as STRING type since there is no JSON datatype in Apache Flink® ) WITH ( 'connector' = 'kafka', ... // Other properties should be filled based on your connection settings to Apache Kafka ... );

With the table defined, we can start filtering out the badly formed records coming from broken IoT devices. We can achieve that by keeping only messages with valid JSON in the payload field which also contains the data attribute. The new JSON_EXISTS function solves exactly this problem:

SELECT * FROM sensors WHERE JSON_EXISTS(payload, '$.data');

In the above SQL, a special character $$$ denotes the root node in a JSON path from which we can access properties, like .data.Ifweneedtoparsearrays,wecanuseasimilarsyntaxtoaccessanpreciseelementbyposition(.data`. If we need to parse arrays, we can use a similar syntax to access an precise element by position (.a[0].b), or retrieve properties for each element in the array ($$.a[*].b).

The above SQL generates the following output:

id name payload 1 Temperature Sensor {"data":{"metric":"Temperatur~ 2 Temperature Sensor {"data":{"metric":"Temperatur~ 4 Light Sensor {"data":{"metric":"Light","va~ 5 Noise Sensor {"data":{"metric":"Noise","va~ 9 Temperature Sensor {"data":{"metric":"Temperatur~ 10 Temperature Sensor {"data":{"metric":"Temperatur~ 11 Temperature Sensor {"data":{"metric":"Temperatur~ 12 Temperature Sensor {"data":{"metric":"Temperatur~ 13 Temperature Sensor {"data":{"metric":"Temperatur~

After filtering only the valid messages, we can start analyzing the data. For example, we might need to extract the city, stored in the location attribute of the correctly formed messages. We can use the JSON_VALUE function, retrieving a scalar from a JSON field, to retrieve the needed information:

SELECT DISTINCT JSON_VALUE(payload, '$.location') AS `city` FROM sensors WHERE JSON_EXISTS(payload, '$.data');

The result should be the following. Please note that one of the messages has a <NULL> city, this is due to the message not containing the location attribute.

city Berlin <NULL> Paris London Rome

The JSON_VALUE function extracts data as STRINGs by default. However, we might want to change the resulting datatype, specifically in cases when the output is a number for which we want to perform some calculation.

For instance, if we want to calculate average temperature by city, we can get the sensor reading by extracting the payload field .data.value, with the RETURNING option casting it to INTEGER. Finally we can aggregate over the .location field:

SELECT AVG(JSON_VALUE(payload, '$.data.value' RETURNING INTEGER)) AS `avg_temperature`, JSON_VALUE(payload, '$.location') AS `city` FROM sensors WHERE JSON_VALUE(payload, '$.data.metric') = 'Temperature' GROUP BY JSON_VALUE(payload, '$.location');

And the expected output is:

avg_temperature city 19 Berlin 17 Paris 23 London 25 Rome

So far all the fields extracted were scalars. But Apache Flink® provides also a function to extract more complex data points called JSON_QUERY.
For instance, the following query extracts the complete data field from all messages with valid JSON and shows NULL for the remaining ones.

SELECT JSON_QUERY(payload, '$.data') AS `data` FROM sensors;

The query output is:

data {"metric":"Temperature","valu~ {"metric":"Temperature","valu~ <NULL> {"metric":"Light","value":23,~ {"metric":"Noise","value":43,~ <NULL> <NULL> {"metric":"Temperature","valu~ {"metric":"Temperature","valu~ {"metric":"Temperature","valu~ {"metric":"Temperature","valu~ {"metric":"Temperature","valu~

In the previous section we saw how to parse JSON messages with SQL and extract the required fields, but that's only part of the story. Apache Flink® SQL also enables us to build nested JSON datasets. Let's first create a flat representation of our dataset above with the max function extracting the peak measurement for a certain location, metric and timestamp.

SELECT JSON_VALUE(payload, '$.location') as loc, JSON_VALUE(payload, '$.data.metric') as metric, TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) as timestamp_value, MAX(JSON_VALUE(payload, '$.data.value')) as max_value FROM sensors WHERE JSON_EXISTS(payload, '$.data') AND JSON_EXISTS(payload, '$.location') GROUP BY JSON_VALUE(payload, '$.data.metric'), JSON_VALUE(payload, '$.location'), TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp'));

The output is a flat table containing the peak value for each metric, city and timestamp:

loc metric max_value timestamp_value Berlin Temperature 23 2018-12-10 13:45:00.000 Berlin Temperature 16 2018-12-10 13:55:00.000 Berlin Light 23 2018-12-10 13:45:00.000 Paris Temperature 23 2018-12-10 13:45:00.000 Paris Temperature 12 2018-12-10 13:55:00.000 London Temperature 23 2018-12-10 13:45:00.000 Rome Temperature 23 2018-12-10 13:45:00.000 Rome Temperature 28 2018-12-10 13:55:00.000

Now, let's assume we want to create a unique message per timestamp_value and metric containing a JSON field listing all the cities having an IoT reading and the value of the reading itself.

To condense all the values in a unique row, we can use the JSON_OBJECTAGG function which builds a JSON object string by aggregating key-value expressions.

Apache Flink® SQL does not allow nested aggregated functions, therefore we need another wrapper.

WITH sensors_with_max_metric AS ( SELECT JSON_VALUE(payload, '$.location') AS loc, JSON_VALUE(payload, '$.data.metric') AS metric, TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) AS timestamp_value, MAX(JSON_VALUE(payload, '$.data.value')) AS max_value FROM sensors WHERE JSON_EXISTS(payload, '$.data') AND JSON_EXISTS(payload, '$.location') GROUP BY JSON_VALUE(payload, '$.data.metric'), JSON_VALUE(payload, '$.location'), TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) ) SELECT timestamp_value, metric, JSON_OBJECTAGG(KEY loc VALUE max_value) AS json_object_value FROM sensors_with_max_metric GROUP BY timestamp_value, metric;

Note that JSON_OBJECTAGG allows us to define what fields to use as KEY and VALUE of the JSON object. The output of the above SQL is:

timestamp_value metric json_object_value 2018-12-10 13:45:00.000 Light {"Berlin":"23"} 2018-12-10 13:45:00.000 Temperature {"Berlin":"23","London":"23",~ 2018-12-10 13:55:00.000 Temperature {"Berlin":"16","Paris":"12","~

As a last step, we can create one unique nested JSON document per message, including all the three columns above, with the JSON_OBJECT function, which behaves similarly to JSON_OBJECTAGG but without aggregation:

WITH sensors_with_max_metric AS ( SELECT JSON_VALUE(payload, '$.location') AS loc, JSON_VALUE(payload, '$.data.metric') AS metric, TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) AS timestamp_value, MAX(JSON_VALUE(payload, '$.data.value')) AS max_value FROM sensors WHERE JSON_EXISTS(payload, '$.data') AND JSON_EXISTS(payload, '$.location') GROUP BY JSON_VALUE(payload, '$.data.metric'), JSON_VALUE(payload, '$.location'), TO_TIMESTAMP(JSON_VALUE(payload, '$.timestamp')) ), sensors_with_max_metric_grouped_by_metric_and_timestamp AS ( SELECT timestamp_value, metric, JSON_OBJECTAGG(KEY loc VALUE max_value) AS loc FROM sensors_with_max_metric GROUP BY timestamp_value, metric) SELECT JSON_OBJECT( KEY 'timestamp' VALUE timestamp_value, KEY 'metric' VALUE s.metric, KEY 'values' VALUE s.loc ) FROM sensors_with_max_metric_grouped_by_metric_and_timestamp s;

The final output is the following, with three nested JSON objects.

{"timestamp":"2018-12-10 13:45:00.000","metric":"Light","values":"{\"Berlin\":\"23\"}"} {"timestamp":"2018-12-10 13:45:00.000","metric":"Temperature","values":"{\"Berlin\":\"23\",\"London\":\"23\",\"Paris\":\"23\",\"Rome\":\"23\"}"} {"timestamp":"2018-12-10 13:55:00.000","metric":"Temperature","values":"{\"Berlin\":\"16\",\"Paris\":\"12\",\"Rome\":\"28\"}"}

We managed to parse and build nested JSON data structure using only SQL statements. No more need of custom functions to handle one of the most used data formats in data technology!

Next Steps

Driven by a huge community effort, Apache Flink®'s SQL is becoming more and more powerful in every release. More features, connectors, functions are added, making the SQL APIs the main driver to define streaming data pipelines. The JSON functions explained here will land in version 1.15.0, but you can already start getting familiar with the huge set of SQL functions available now.

Some more references:


Related resources