Using Kafka Connect JDBC Source: a PostgreSQL® example

Find out how to use Apache Kafka® Connect to update an old app-to-db design to use up-to-date tech tools without disrupting the original solution.

If we go back in history few years, the typical data pipeline was an app creating events and pushing them to a backend database. Data was then propagated to downstream applications via dedicated ETL flows at regular intervals, usually daily.

In these modern times, Apache Kafka® has become the default data platform. Apps write events to Kafka, which then distributes them in near-real-time to downstream sinks like databases or cloud storages. Apache Kafka® Connect, a framework to stream data into and out of Apache Kafka, represents a further optimisation that makes the ingestion and propagation of events just a matter of config files settings.

What if we're facing an old app-to-database design? How can we bring it up-to-date and include Kafka in the game? Instead of batch exporting to the database at night, we can add Kafka to the existing system. Kafka Connect lets us integrate to an existing system and make use of more modern tech tools, without disrupting the original solution.

One way to do this is to use the Kafka Connect JDBC Connector. This post will walk you through an example of sourcing data from an existing table in PostgreSQL® and populating a Kafka topic with only the changed rows. This is a great approach for many use cases. But when no additional query load to the source system is allowed, you could also make use of change data capture solutions based on tools like Debezium. As we'll see later, Aiven provides Kafka Connect as a managed service for both options. You can start your connectors without the hassle of managing a dedicated cluster.

Architecture Overview

This blog post provides an example of the Kafka Connect JDBC Source based on a PostgreSQL database. A more detailed explanation of the connector is provided in our documentation, Create a JDBC source connector from PostgreSQL® to Apache Kafka®

In our example, we first create a PostgreSQL database to act as backend data storage for our imaginary application. Then we create a Kafka cluster with Kafka Connect and show how any new or modified row in PostgreSQL appears in a Kafka topic.

Creating the PostgreSQL Source system

We'll create the whole setup using the Aiven Command Line Interface. Follow the instructions in that document to install the avn command and log in.

Once you've logged in to the Aiven client, we can create a PostgreSQL database with the following avn command in our terminal:

avn service create pg-football \ -t pg \ --cloud google-europe-west3 \ -p business-4

This command creates a PostgreSQL database (that's what -t pg does) named pg-football on region google-europe-west3. The selected plan driving the amount of resources available and associated billing is business-4.

The create command returns immediately, Aiven received the request and started creating the instance. We can wait for the database to be ready with the following command:

avn service wait pg-football

The wait command can be executed against any Aiven instance, and returns only when the service is in RUNNING mode.

Time to Scout Football Players

Now let's create our playground: we are a football scouting agency, checking players all over the world and our app pushes the relevant data to a PostgreSQL table. Let's login to PostgreSQL from the terminal:

avn service cli pg-football

Our agency doesn't do a great job at scouting, all we are able to capture is the player's name, nationality and a flag is_retired showing their activity status.
We create a simple football_players table containing the above information together with two control columns:

  • created_at keeping the record's creation time
  • modified_at for the row's last modification time

These two columns will later be used from the Kafka Connect connector to select the recently changed rows.
Now it's time to create the table from the PostgreSQL client:

CREATE TABLE football_players ( name VARCHAR ( 50 ) PRIMARY KEY, nationality VARCHAR ( 255 ) NOT NULL, is_retired BOOLEAN DEFAULT false, created_at TIMESTAMP NOT NULL DEFAULT NOW(), modified_at TIMESTAMP );

The created_at field will work as expected immediately, with the DEFAULT NOW() definition.
The modified_at on the other side, requires a bit more tuning to be usable. We'll need to create a trigger that inserts the current timestamp in case of updates. The following SQL can be executed from the PostgreSQL client:

CREATE OR REPLACE FUNCTION change_modified_at() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN NEW.modified_at := NOW(); RETURN NEW; END; $$ ; CREATE TRIGGER modified_at_updates BEFORE UPDATE ON football_players FOR EACH ROW EXECUTE PROCEDURE change_modified_at();

The first statement creates the change_modified_at function that will later be used by the modified_at_updates trigger.

Football Scouting App at Work

We can now simulate our football scouting app behaviour by manually inserting three rows in the football_players table from the PostgreSQL client with

INSERT INTO football_players (name, nationality, is_retired) VALUES ('Andrea Pirlo','Italian', true); INSERT INTO football_players (name, nationality, is_retired) VALUES ('Cristiano Ronaldo','Portuguese', false); INSERT INTO football_players (name, nationality, is_retired) VALUES ('Megan Rapinoe','American', true);

We can verify that the created_at column is successfully populated in PostgreSQL with

SELECT * FROM football_players;

Which will output

name | nationality | is_retired | created_at | modified_at -------------------+-------------+------------+----------------------------+------------- Andrea Pirlo | Italian | t | 2021-03-11 10:35:52.04076 | Cristiano Ronaldo | Portuguese | f | 2021-03-11 10:35:52.060104 | Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 | (3 rows)

Perfect, the app is working when inserting new rows. If only we could have an update to an existing row...

Breaking news - Pirlo comes out of retirement

Well, this was somehow expected, Juventus FC went out of Champions League and needed new energy in the midfield. We can update the relevant row with

UPDATE football_players SET is_retired=false WHERE name='Andrea Pirlo';

We can check that the modified_at is correctly working by issuing the same select * from football_players; statement in the PostgreSQL client and checking the following output

name | nationality | is_retired | created_at | modified_at ------------------+-------------+------------+----------------------------+---------------------------- Cristiano Ronaldo | Portuguese | f | 2021-03-11 10:35:52.060104 | Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 | Andrea Pirlo | Italian | f | 2021-03-11 10:35:52.04076 | 2021-03-11 10:39:49.198286 (3 rows)

Ok, we recreated the original setup: our football scouting app is correctly storing data in the football_players table. In the old days the extraction of that data was demanded to an ETL flow running overnight and pushing it to the downstream applications. Now, as per our original aim, we want to include Apache Kafka in the game, so... let's do it!

Creating a Kafka environment

As stated initially, our goal is to base our data pipeline on Apache Kafka without having to change the existing setup. We don't have a Kafka environment available right now, but we can easily create one using Aiven's CLI from the terminal with the following avn command

avn service create kafka-football \ -t kafka \ --cloud google-europe-west3 \ -p business-4 \ -c kafka.auto_create_topics_enable=true \ -c kafka_connect=true

The command creates an Apache Kafka instance (-t kafka) in google-europe-west3 with the business-4 plan.
Additionally it enables the topic auto-creation (-c kafka.auto_create_topics_enable=true) so our applications can create topics on the fly without forcing us to create them beforehand.
Finally, it enables Kafka Connect (-c kafka_connect=true) on the same Kafka instance. We can use the avn wait command mentioned above to pause until the Kafka cluster is in RUNNING state.

Note that on Kafka instances part of the startup plans, you'll be forced to create a standalone Kafka Connect instance. For production systems, we recommend using standalone Kafka Connect for the separation of concerns principle.

Connecting the dots

The basic building blocks are ready: our source system represented by the pg-football PostgreSQL database with the football_players table and the kafka-football Apache Kafka instances are running. It's now time to connect the two: creating a new event in Kafka every time an insert or modified row appears in PostgreSQL. That can be achieved by creating a Kafka Connect JDBC source connector.

Create a JSON configuration file

Start by creating a JSON configuration file like the following:

{ "name": "pg-timestamp-source", "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require", "connection.user": "<PG_USER>", "connection.password": "<PG_PASSWORD>", "table.whitelist": "football_players", "mode": "timestamp", "timestamp.column.name":"modified_at,created_at", "poll.interval.ms": "2000", "topic.prefix": "pg_source_" }

Where the important parameters are:

  • name: the name of the Kafka Connect connector, in our case pg-timestamp-source
  • connection.url: the connection URL pointing to the PostgreSQL database, in the form of jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?<ADDITIONAL_PARAMETERS>, we can create it with the dbname, host, port output of the following avn command
avn service get pg-football --format '{service_uri_params}'
  • connection.user and connection.user: PostgreSQL credentials, the default avnadmin credentials are available as user and password output of the avn command above
  • table.whitelist: list of tables to source from PostgreSQL, in our case is football_players
  • mode: Kafka Connect JDBC mode. Three modes are available: bulk, incrementing, timestamp. For this post we'll use the timestamp one. For a more detailed description of modes, please refer to the help article
  • timestamp.column.name: list of timestamp column names: The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column.
  • poll.interval.ms: time between database polls
  • topic.prefix: prefix for topic, the full topic name will be a concatenation of topic.prefix and the PostgreSQL table name.

Start the JDBC connector

After storing the above JSON in a file named kafka_jdbc_config.json, we can now start the Kafka Connect JDBC connector in our terminal with the following command:

avn service connector create kafka-football @kafka_jdbc_config.json

We can verify the status of the Kafka Connect connector with the following avn command:

avn service connector status kafka-football pg-timestamp-source

Note that the last parameter pg-timestamp-source in the avn command above refers to the Kafka Connect connector name defined in the name setting of the kafka_jdbc_config.json configuration file. If all settings are correct, the above command will show our healthy connector being in RUNNING mode

{ "status": { "state": "RUNNING", "tasks": [ { "id": 0, "state": "RUNNING", "trace": "" } ] } }

Check the data in Kafka with kcat

The data should now have landed in Apache Kafka. How can we check it?
We can use kcat a nice command line utility.

Once kcat is installed, we'll need to set up the connection to our Kafka environment.

Aiven by default enables SSL certificate based authentication. The certificates are available from the Aiven console for manual download. In Aiven CLI you can avoid the clicking with the following avn command in our terminal:

mkdir -p kafkacerts avn service user-creds-download kafka-football \ -d kafkacerts \ --username avnadmin

These commands create a kafkacerts folder (if not existing already) and download in it the ca.pem, service.cert and service.key SSL certificates required to connect.

The last missing piece of information that kcat needs is where to find our Kafka instance in terms of hostname and port. This information can be displayed in our terminal with the following avn command

avn service get kafka-football --format '{service_uri}'

Once we collected the required info we can create a kcat.config file with the following entries

bootstrap.servers=<KAFKA_SERVICE_URI> security.protocol=ssl ssl.key.location=kafkacerts/service.key ssl.certificate.location=kafkacerts/service.cert ssl.ca.location=kafkacerts/ca.pem

Remember to substitute the <KAFKA_SERVICE_URI> with the output of the avn service get command mentioned above.

Now we are ready to read the topic from Kafka by pasting the following command in our terminal:

kcat -F kcat.config -C -t pg_source_football_players

Note that we are using kcat in consumer mode (flag -C) reading from the topic pg_source_football_players which is the concatenation of the topic.prefix setting in Kafka Connect and the name of our football_players PostgreSQL table.

As expected, since the connector is working, kcat will output the three messages present in the Kafka topic matching the three rows in the football_players PostgreSQL table

{"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":false,"created_at":1615458952060,"modified_at":null} {"name":"Megan Rapinoe","nationality":"American","is_retired":true,"created_at":1615458952673,"modified_at":null} {"name":"Andrea Pirlo","nationality":"Italian","is_retired":false,"created_at":1615458952040,"modified_at":1615459189198} % Reached end of topic pg_source_football_players [0] at offset 3

Updating the listings

Now, let's see if our football scouts around the world can fetch some news for us

Ronaldo retires, enter Gorlami

Wow, we found a new talent named Enzo Gorlami and Cristiano Rolando officially retired today from professional football (please be aware this post is not reflecting football reality). Let's push the two news to PostgreSQL:

INSERT INTO football_players (name, nationality, is_retired) VALUES ('Enzo Gorlami','Italian', false); UPDATE football_players SET is_retired=true WHERE name='Cristiano Ronaldo';

We can verify that the data is correctly stored in the database:

defaultdb=> select * from football_players; name | nationality | is_retired | created_at | modified_at -------------------+-------------+------------+----------------------------+---------------------------- Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 | Andrea Pirlo | Italian | f | 2021-03-11 10:35:52.04076 | 2021-03-11 10:39:49.198286 Enzo Gorlami | Italian | f | 2021-03-11 11:09:49.411885 | Cristiano Ronaldo | Portuguese | t | 2021-03-11 10:35:52.060104 | 2021-03-11 11:11:36.790781 (4 rows)

And in Kafkacat we receive the following two updates:

{"name":"Enzo Gorlami","nationality":"Italian","is_retired":false,"created_at":1615460989411,"modified_at":null} % Reached end of topic pg_source_football_players [0] at offset 4 {"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":true,"created_at":1615458952060,"modified_at":1615461096790} % Reached end of topic pg_source_football_players [0] at offset 5

Further reading

If you want to know more about Aiven, Kafka, Kafka Connect or PostgreSQL, check the references below:

Wrapping up

This blog post showed how to easily integrate PostgreSQL and Apache Kafka with a fully managed, config-file-driven Kafka Connect JDBC connector. We used a timestamp-based approach to retrieve the changed rows since the previous poll and push them to a Kafka topic increasing the query load to the source database.

An alternative method is represented by Change Data Capture solutions like Debezium, which, in case of PostgreSQL, reads changes directly from WAL files avoiding any additional query load on the source database. A guide on how to setup CDC for Aiven PostgreSQL is provided in Create a Debezium source connector from PostgreSQL® to Apache Kafka®.

--

Not using Aiven services yet? Sign up now for your free trial at https://console.aiven.io/signup!

In the meantime, make sure you follow our changelog and RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.