Skip to main content

Streaming anomaly detection with Apache Flink®, Apache Kafka® and PostgreSQL®

Learn how to build a streaming anomaly detection system. In particular:

  • How to create a fake streaming dataset.
  • How to create and use Apache Kafka for data streaming.
  • How to create and use PostgreSQL® to store threshold data.
  • How to create and use Apache Flink® to define streaming data pipelines.
  • How to push the outcome of the anomaly detection system as a Slack notification.

Anomaly detection

Anomaly detection is a way to find unusual or unexpected things in data. It is helpful in a variety of fields, such as fraud detection, network security, quality control and others. Let's build our own streaming anomaly detection system.

For example: A payment processor might set up anomaly detection against an e-commerce store if it notices that the store - which sells its items in Indian Rupees and is only configured to sell to the Indian market - is suddenly receiving a high volume of orders from Spain. This behavior could indicate fraud. Another example is that of a domain hosting service implementing a CAPTCHA against an IP address it deems is interacting with one of its domains in rapid succession.

While it's often easier to validate anomalies in data once they due to be stored in the database, it's more useful to check in-stream and address unwanted activity before it affects our dataset.

We can check for anomalies in data by creating filtering pipelines using Apache Fiink®. Apache Flink is a flexible, open source tool for in-stream and batch processing of data. It lets us run SQL-like queries against a stream of data and perform actions based on the results of those queries.

We'll use a fake Internet of Things (IoT) sensor that generates data on a CPU usage for various devices as our continuous flow of data. Once the data is flowing, we'll also create a basic filtering pipeline to separate the usage values surpassing a fixed threshold (80%).

This example mimics a scenario where you might want to separate and generate alerts for anomalies in single events. For instance, a sensor having a CPU utilization of 99% might create a heating problem and therefore you might want to notify the team in charge of the inventory to schedule the replacement.

However, receiving a notification on every sensor reading that surpasses a fixed threshold can be overwhelming and create false positives for short usage spikes. Therefore you'll create a second, more advanced, pipeline to average the sensor readings values over 30 seconds windows and then compare the results with various thresholds, defined for every IoT device, and stored in a reference table.

Reading the average CPU value in 30 second windows will improve your initial implementation and help avoid false positive notifications for single spikes, but still let you flag components that are at potential risk of overheating. The threshold lookup enables a more precise definition of alert ranges depending on the device type.

The tutorial includes:

  • Apache Flink for data transformation.
  • Apache Kafka for data streaming.
  • PostgreSQL® for data storage/query.
  • Slack as notification system.

In this tutorial we'll be using Aiven services, specifically Aiven for Apache Flink®, Aiven for Apache Kafka®, and Aiven for PostgreSQL®. All of these are open source tools widely available. We encourage you to sign up for a free trial to follow along as it will reduce any issues you might have with networking and getting services to communicate with each other to nearly zero.

Architecture overview

To build near real-time anomaly detection system, you'll build a streaming data pipeline that will be able to process the IoT sensor readings as soon as they are generated. The pipeline relies on two sources: the first source is an Apache Kafka topic that contains the fake stream of IoT metrics data and the second is a table in PostgreSQL® database containing alerting thresholds, defined for each IoT device. Then an Apache Flink® service combines the data, applies some transformation SQL to find the anomalies, and pushes the result to a separate Apache Kafka® topic or a Slack channel for team notification.

Prerequisites

The tutorial uses Aiven services, therefore you'll need a valid Aiven account. On top of the Aiven account, you will also need the following three items:

Create the Aiven services

In this section you'll create all the services needed to define the anomaly detection system via the Aiven Console:

Create an Aiven for Apache Kafka® service

The Aiven for Apache Kafka service is responsible for receiving the inbound stream of IoT sensor readings. Create the service with the following steps:

  1. Log in to the Aiven web console.

  2. On the Services page, click Create service.

    This opens a new page with the available service options.

    Aiven Console view for creating a new service

  3. Select Apache Kafka®.

  4. Select the cloud provider and region that you want to run your service on.

  5. Select business-4 as service plan. The business-4 plan allows you to define the service integrations needed to define Apache Flink streaming transformations over the Apache Kafka topic.

  6. Enter demo-kafka as name for your service.

  7. Click Create service under the summary on the right side of the console

Customise the Aiven for Apache Kafka service

Now that your service is created, you need to customise its functionality. On the Service settings page of your freshly created service, you'll see a bunch of toggles and properties. Change these two:

  1. Enable the Apache Kafka REST APIs to manage and query via the Aiven Console.

    Navigate to Service settings page > Service management section > actions (...) menu > Enable REST API (Karapace).

  2. Enable the automatic creation of Apache Kafka topics to create new Apache Kafka® topics on the fly while pushing a first record.

    Navigate to Service settings page > Advanced configuration section > Configure > Add configuration options > kafka.auto_create_topics_enable, enable the selected parameter using the toggle switch, and select Save configuration.

Create an Aiven for PostgreSQL® service

The PostgreSQL database is where you'll store the threshold data for each IoT device. These thresholds represent the alerting range of each IoT device, for example, a device might trigger an alert when the usage is over 90%, for other devices, the threshold should be 60%.

You can create the Aiven for PostgreSQL database with the following steps:

  1. Log in to the Aiven web console.
  2. On the Services page, click Create service.
  3. Select PostgreSQL®.
  4. Select the cloud provider and region that you want to run your service on.
  5. Select Startup-4 as service plan. The Startup-4 plan allows you to define the service integrations needed to define Apache Flink streaming transformations over the data in the PostgreSQL® table.
  6. Enter demo-postgresql as name for your service.
  7. Click Create service under the summary on the right side of the console

The Apache Flink service is where you'll define the streaming data pipelines to calculate and detect the anomalies.

You can create the Aiven for Apache Flink service with the following steps:

  1. Log in to the Aiven web console.
  2. On the Services page, click Create a new service.
  3. Select Apache Flink®.
  4. Select the cloud provider and region that you want to run your service on.
  5. Select business-4 as service plan. The business-4 is the minimal plan available for Aiven for Apache Flink, enough to define all the data transformations in this tutorial.
  6. Enter demo-flink as name for your service.
  7. Click Create Service under the summary on the right side of the console.

After creating the service, you'll be redirected to the service details page. Apache Flink doesn't work in isolation, it needs data sources and sinks. Therefore you'll need to define the integrations between Apache Flink service and:

  • Aiven for Apache Kafka®, which contains the stream of IoT sensor readings.
  • Aiven for PostgreSQL®, which contains the alerting thresholds.

You can define the service integrations, on the Aiven for Apache Flink® Overview page, with the following steps:

  1. Select Create data pipeline in section Create and manage your data streams with ease at the top of the Overview page.
  2. In the Data Service Integrations window, select the Aiven for Apache Kafka checkbox and, next, select the demo-kafka service. Select Integrate.
  3. Back on the Overview page, in the Data Flow section, select the + icon.
  4. In the Data Service Integrations window, select the Aiven for PostgreSQL checkbox and, next, select the demo-postgresql service. Select Integrate.

Once the above steps are completed, your Data Flow section should be similar to the below:

Aiven for Apache Flink Overview tab, showing the Integrations to Aiven for Apache Kafka and Aiven for PostgreSQL

Set up the IoT metrics streaming dataset

Now that the plumbing of all the components is sorted, it's time for you to create a continuous stream of fake IoT data that will land in an Aiven for Apache Kafka topic. There are various ways to generate fake data, for the tutorial you'll use the Dockerized fake data producer for Aiven for Apache Kafka® allowing you to generate a continuous flow of data with a minimal setup.

Create an Aiven authentication token

The Dockerized fake data producer for Aiven for Apache Kafka® requires an Aiven authentication token to fetch all the Apache Kafka connection parameters.

You can create an authentication token with the following steps:

  1. Log in to the Aiven Console.

  2. Click the user icon in the top-right corner of the page.

  3. Click Tokens tab.

  4. Click the Generate token button.

  5. Enter a description (optional) and a time limit (optional) for the token. Leave the Max age hours field empty if you do not want the token to expire.

    Aiven Console showing the authentication tokens

  6. Click Generate token.

  7. Click the Copy icon or select and copy the access token.

    note

    You cannot get the token later after you close this view.

  8. Store the token safely and treat this just like a password.

  9. Click Close.

Start the fake IoT data generator

It's time to start streaming the fake IoT data that you'll later process with with Apache Flink:

note

You can also use other existing data, although the examples in this tutorial are based on the IoT sample data.

  1. Clone the Dockerized fake data producer for Aiven for Apache Kafka® repository to your computer:

    git clone https://github.com/aiven/fake-data-producer-for-apache-kafka-docker.git
  2. Navigate in the to the fake-data-producer-for-apache-kafka-docker directory and copy the conf/env.conf.sample file to conf/env.conf.

  3. Edit the conf/env.conf file and update the following parameters:

    • PROJECT_NAME to the Aiven project name where your services have been created.

    • SERVICE_NAME to the Aiven for Apache Kafka service name demo-kafka.

    • TOPIC to cpu_load_stats_real.

    • NR_MESSAGES to 0.

      note

      The NR_MESSAGES option defines the number of messages that the tool creates when you run it. Setting this parameter to 0 creates a continuous flow of messages that never stops.

    • USERNAME to the username used to login in the Aiven console.

    • TOKEN to the Aiven token generated at the previous step of this tutorial.

  4. Run the following command to build the Docker image:

    docker build -t fake-data-producer-for-apache-kafka-docker .
  5. Run the following command to run the Docker image:

    docker run fake-data-producer-for-apache-kafka-docker

    You should now see the above command pushing IoT sensor reading events to the cpu_load_stats_real topic in your Apache Kafka® service:

    {"hostname": "dopey", "cpu": "cpu4", "usage": 98.3335306302198, "occurred_at": 1633956789277}
    {"hostname": "sleepy", "cpu": "cpu2", "usage": 87.28240549074823, "occurred_at": 1633956783483}
    {"hostname": "sleepy", "cpu": "cpu1", "usage": 85.3384018012967, "occurred_at": 1633956788484}
    {"hostname": "sneezy", "cpu": "cpu1", "usage": 89.11518629380006, "occurred_at": 1633956781891}
    {"hostname": "sneezy", "cpu": "cpu2", "usage": 89.69951046388306, "occurred_at": 1633956788294}

Check the data in Apache Kafka

To check if your fake data producer is running, head to Apache Kafka in the Aiven console and look for the cpu_load_stats_real topic:

  1. Log in to the Aiven web console.

  2. Click on the Aiven for Apache Kafka service name demo-kafka.

  3. Click on the Topics from the left sidebar.

  4. On the cpu_load_stats_real line, select the ... symbol and then click on Topic messages.

    Aiven for Apache Kafka Topic tab, showing the ``cpu_load_stats_real`` topic being created and the location of the ``...`` icon

  5. Click on the Fetch Messages button.

  6. Toggle the Decode from base64 option.

  7. You should see the messages being pushed to the Apache Kafka topic:

    detail of the messages in the ``cpu_load_stats_real`` topic including both key and value in JSON format

  8. Click again on the Fetch Messages button to refresh the visualization with new messages.

Create a basic anomaly detection pipeline with filtering

The first anomaly detection pipeline that you'll create showcases a basic anomaly detection system: you want to flag any sensor reading exceeding a fixed 80% threshold since it could represent a heating anomaly. You'll read the IoT sensor readings from the cpu_load_stats_real in Apache Kafka, build a filtering pipeline in Apache Flink, and push the readings above the 80% threshold back to Apache Kafka, but to a separate cpu_load_stats_real_filter topic.

The steps to create the filtering pipeline are the following:

  1. Create a new Aiven for Apache Flink application.
  2. Define a source table to read the metrics data from your Apache Kafka® topic.
  3. Define a sink table to send the processed messages to a separate Apache Kafka® topic.
  4. Define a SQL transformation definition to process the data.
  5. Create an application deployment to execute the pipeline.

If you feel brave, you can go ahead and try try yourself in the Aiven Console. Otherwise you can follow the steps below:

  1. In the Aiven Console, open the Aiven for Apache Flink service named demo-flink and go to the Applications from the left sidebar.

  2. Click Create new application to create your Flink application.

    The Apache Flink **Application** tab with the **Create Application** button

  3. Name the new application filtering and click Create application.

    The Apache Flink **Application** named ``filtering``

  4. Create the first version of the application by clicking on Create first version button.

  5. In the Add source tables tab, create the source table (named CPU_IN), pointing to the Apache Kafka® topic cpu_load_stats_real where the IoT sensor readings are stored by:

    • Select Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Paste the following SQL:

      CREATE TABLE CPU_IN (
      hostname STRING,
      cpu STRING,
      usage DOUBLE,
      occurred_at BIGINT,
      time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
      WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
      )
      WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '',
      'topic' = 'cpu_load_stats_real',
      'value.format' = 'json',
      'scan.startup.mode' = 'earliest-offset'
      )

    Once created, the source table tab should look like the following:

    Source table tab with ``CPU_IN`` table defined

    Before saving the source table definition, you can check if it matches the data in the topic by clicking on the triangle next to Run. You should see the populated data.

    The Apache Flink source definition with SQL preview of the data

  6. Navigate to the Add sink table tab.

  7. Create the sink table (named CPU_OUT_FILTER), pointing to a new Apache Kafka® topic named cpu_load_stats_real_filter where the readings exceeding the 80% threshold will land, by:

    • Clicking on the Add your first sink table.

    • Selecting Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Pasting the following SQL:

      CREATE TABLE CPU_OUT_FILTER (
      time_ltz TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage DOUBLE
      )
      WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '',
      'topic' = 'cpu_load_stats_real_filter',
      'value.format' = 'json',
      'scan.startup.mode' = 'earliest-offset'
      )

    Once created, the sink table tab should look like the following:

    Sink table tab with ``CPU_OUT`` table defined

  8. Navigate to the Create statement tab.

  9. Enter the following as the transformation SQL statement, taking data from the CPU_IN table and pushing the samples over the 80% threshold to CPU_OUT_FILTER:

    INSERT INTO CPU_OUT_FILTER 
    SELECT
    time_ltz,
    hostname,
    cpu,
    usage
    FROM CPU_IN
    WHERE usage > 80

    If you're curious, you can preview the output of the transformation by clicking on the triangle next to the Run section, the Create statement window should be similar to the following image.

    The Apache Flink data transformation with SQL preview of the data

  10. Click Save and deploy later.

  11. Click Create deployment.

  12. Accept the default deployment parameters and click on Deploy without a savepoint.

    Detail of the new deployment screen showing the default version, savepoint and parallelism parameters

  13. The new application deployment status will show Initializing and then Running: version 1.

Once the application is running, you should start to see messages indicating hosts with high CPU loads in the cpu_load_stats_real_filter topic of your demo-kafka Apache Kafka service.

The Apache Flink data transformation with SQL preview of the data

important

Congratulations! You created your first streaming anomaly detection pipeline!

The data is now available in the Apache Kafka topic named cpu_load_stats_real_filter, from there you could either write your own Apache Kafka consumer to read the high sensor records or use Kafka Connect to sink the data to a wide range of technologies.

Evolve the anomaly detection pipeline with windowing and threshold lookup

In most production environments, you wouldn't want to send an alert on every measurement above the threshold. Sometimes CPUs spike momentarily, for example, and come back down in usage milliseconds later. What's really useful to you in production is if a CPU spike is sustained over a certain period of time.

If a CPU usage spike happens continuously for a 30 seconds interval, there might be a problem. In this step, you'll aggregate the CPU load over a configured time using windows and the event time. By averaging the CPU values over a time window you can filter out short term spikes in usage, and flag only anomaly scenarios where the usage is consistently above a pre-defined threshold for a long period of time.

To add a bit of complexity, and mimic a real scenario, we'll also move away from a fixed 80% threshold, and compare the average utilization figures with the different thresholds, set in a reference table (stored in PostgreSQL), for the various IoT devices based on their hostname. Every IoT device is different, and various devices usually have different alerting ranges. The reference table provides an example of variable, device dependent, thresholds.

Create the windowing pipeline

In this step, you'll create a pipeline to average the CPU metrics figures in 30 seconds windows. Averaging the metric over a time window allows to avoid notification for temporary spikes.

note

In this section, you will be able to reuse CPU_IN source table definition created previously. Importing a working table definition, rather than re-defining it, is a good practice to avoid mistakes.

To complete the section, you will perform the following steps:

  • Create a new Aiven for Apache Flink application.
  • Import the previously created CPU_IN source table to read the metrics data from your Apache Kafka® topic.
  • Define a sink table to send the processed messages to a separate Apache Kafka® topic.
  • Define a SQL transformation definition to process the data.
  • Create an application deployment to execute the pipeline.

You can go ahead an try yourself to define the windowing pipeline. If, on the other side, you prefer a step by step approach, follow the instructions below:

  1. In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.

  2. Click on Create new application and name it cpu_agg.

  3. Click on Create first version.

  4. To import the source CPU_IN table from the previously created filtering application:

    1. Click on Import existing source table
    2. Select filtering as application, Version 1 as version, CPU_IN as table and click Next
    3. Click on Add table
  5. Navigate to the Add sink tables tab.

  6. Create the sink table (named CPU_OUT_AGG) pointing to a new Apache Kafka® topic named cpu_agg_stats, where the 30 second aggregated data will land, by:

    • Clicking on the Add your first sink table.

    • Selecting Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Pasting the following SQL:

      CREATE TABLE CPU_OUT_AGG(
      window_start TIMESTAMP(3),
      window_end TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage_avg DOUBLE,
      usage_max DOUBLE
      )
      WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '',
      'topic' = 'cpu_agg_stats',
      'value.format' = 'json',
      'scan.startup.mode' = 'earliest-offset'
      )
    • Click Add table.

  7. Navigate to the Create statement tab.

  8. Enter the following as the transformation SQL statement, taking data from the CPU_IN table, aggregating the data over a 30 seconds window, and pushing the output to CPU_OUT_AGG:

    INSERT INTO CPU_OUT_AGG
    SELECT
    window_start,
    window_end,
    hostname,
    cpu,
    AVG(usage),
    MAX(usage)
    FROM
    TABLE( TUMBLE( TABLE CPU_IN, DESCRIPTOR(time_ltz), INTERVAL '30' SECONDS))
    GROUP BY
    window_start,
    window_end,
    hostname,
    cpu
  9. Click Save and deploy later.

  10. Click Create deployment.

  11. Accept the default deployment parameters and click on Deploy without a savepoint.

  12. The new application deployment status will show Initializing and then Running: version 1.

When the application is running, you should start to see messages containing the 30 seconds CPU average in the cpu_agg_stats topic of your demo-kafka service.

Create a threshold table in PostgreSQL

You will use a PostgreSQL table to store the various IoT thresholds based on the hostname. The table will later be used by a Flink application to compare the average CPU usage with the thresholds and send the notifications to a Slack channel.

You can create the thresholds table in the demo-postgresql service with the following steps:

note

The below instructions assume psql is installed in your local machine.

  1. In the Aiven Console, open the Aiven for PostgreSQL service demo-postgresql.

  2. In the Overview tab locate the Service URI parameter and copy the value.

  3. Connect via psql to demo postgresql with the following terminal command, replacing the <SERVICE_URI> placeholder with the Service URI string copied in the step above:

    psql "<SERVICE_URI>"
  4. Create the cpu_thresholds table and populate the values with the following code:

    CREATE TABLE cpu_thresholds
    (
    hostname VARCHAR,
    allowed_top INT
    );

    INSERT INTO cpu_thresholds
    VALUES ('doc', 20),
    ('grumpy', 30),
    ('sleepy',40),
    ('bashful',60),
    ('happy',70),
    ('sneezy',80),
    ('dopey',90);
  5. Enter the following command to check that the threshold values are correctly populated:

    SELECT * FROM cpu_thresholds;

    The output shows you the content of the table:

    hostname | allowed_top
    ---------+------------
    doc | 20
    grumpy | 30
    sleepy | 40
    bashful | 60
    happy | 70
    sneezy | 80
    dopey | 90

Create the notification pipeline comparing average CPU data with the thresholds

At this point, you should have both a stream of the 30 seconds average CPU metrics coming from Apache Kafka, and a set of "per-device" thresholds stored in the PostgreSQL database. This section showcases how you can compare the usage with the thresholds and send a slack notification identifying anomaly situations of when the usage is exceeding the thresholds.

You can complete the section with the following steps:

  • Create a new Aiven for Apache Flink application.
  • Create a source table to read the aggregated metrics data from your Apache Kafka® topic.
  • Define a sink table to send the processed messages to a separate Slack channel.
  • Define a SQL transformation definition to process the data.
  • Create an application deployment to execute the pipeline.

To create the notification data pipeline, you can go ahead an try yourself or follow the steps below:

  1. In the Aiven Console, open the Aiven for Apache Flink service and go to the Applications tab.

  2. Click on Create new application and name it cpu_notification.

  3. Click on Create first version.

  4. To create a source table CPU_IN_AGG pointing to the Apache Kafka topic cpu_agg_stats:

    • Click on Add your first source table.

    • Select Aiven for Apache Kafka - demo-kafka as Integrated service.

    • Paste the following SQL:

      CREATE TABLE CPU_IN_AGG(
      window_start TIMESTAMP(3),
      window_end TIMESTAMP(3),
      hostname STRING,
      cpu STRING,
      usage_avg DOUBLE,
      usage_max DOUBLE
      )
      WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '',
      'topic' = 'cpu_agg_stats',
      'value.format' = 'json',
      'scan.startup.mode' = 'earliest-offset'
      )
    • Click Add table.

  5. To create a source table CPU_THRESHOLDS pointing to the PostgreSQL table cpu_thresholds:

    • Click on Add new table.

    • Select Aiven for PostgreSQL - demo-postgresql as Integrated service.

    • Paste the following SQL:

      CREATE TABLE CPU_THRESHOLDS(
      hostname STRING,
      allowed_top INT,
      PRIMARY KEY (hostname) NOT ENFORCED
      )
      WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:postgresql://',
      'table-name' = 'public.cpu_thresholds'
      )
    • Click Add table.

  6. Navigate to the Add sink tables tab.

  7. To create a sink table SLACK_SINK pointing to a Slack channel for notifications:

    • Click on Add your first sink table.

    • Select No integrated service as Integrated service.

    • Paste the following SQL, replacing the <SLACK_TOKEN> placeholder with the Slack authentication token:

      create table SLACK_SINK (
      channel_id STRING,
      message STRING
      ) WITH (
      'connector' = 'slack',
      'token' = '$SLACK_TOKEN'
      )
  8. Navigate to the Create statement tab.

  9. Enter the following as the transformation SQL statement, taking data from the CPU_IN_AGG table, comparing it with the threshold values from CPU_THRESHOLDS and pushing the samples over the threshold to SLACK_SINK:

    INSERT INTO SLACK_SINK
    SELECT
    '$CHANNEL_ID',
    'host:' || CPU.hostname ||
    ' CPU: ' || cpu ||
    ' avg CPU value:' || TRY_CAST(usage_avg as string) ||
    ' over the threshold ' || TRY_CAST(allowed_top as string)
    FROM CPU_IN_AGG CPU INNER JOIN CPU_THRESHOLDS
    ON CPU.hostname = CPU_THRESHOLDS.hostname
    WHERE usage_avg > allowed_top
    note

    The <CHANNEL_ID> placeholder needs to be replaced by the Slack channel ID parameter.

  10. Click Save and deploy later.

  11. Click Create deployment.

  12. Accept the default deployment parameters and click on Deploy without a savepoint.

  13. The new application deployment status will show Initializing and then Running: version 1.

When the application is running, you should start to see notifications about the IoT devices having CPU usage going over the defined thresholds in the Slack channel.

A list of Slack notifications driven by the anomaly detection data pipeline

You created an advanced streaming data pipeline including windowing, joining data coming from different technologies and a Slack notification system