hero-image

Introducing Aiven for PostgreSQL Change Data Capture

October 23, 2019
By John Hammink

What happens when PostgreSQL data changes within a table and action needs to be taken in real time?

Most of the time, data’s journey through your company’s systems starts and ends with its initial store. Updates are often done to this canonical store in bulk; in fact, if we’re talking about ETL (Extract, Transform, and Load), these updates may run only periodically (for example, daily or weekly) and, as such, are always behind the present moment.

What if the network times out before this vast data dump is completely transmitted to the receiver? What if a write failure occurs? What if a type incompatibility emerges that renders the image untransmittable? What if this doesn’t happen until minutes, or even hours, into the data transfer?

And, what if the information is needed at a remote point immediately, so that the data can be acted upon in a timely fashion?

The problem

Clearly, periodic bulk updates come with a variety of potential errors. What if, instead of copying the entire database or table during a synchronization operation, only the changes were duplicated?

When only the delta (changed data) is transferred, it’s much more efficient than simply mirroring or cloning an entire table. What’s more, it can create, in effect, a real-time, streaming transaction log of all changes — one that can be rolled back to bring the datastore, locally or remote, to a previous state.

CDC then

But first, let’s look at how changes were captured in the past. These approaches generally worked by backing up all of the available data in the database, which could be quite cumbersome. Previously, there was typically a nightly database dump of some or all tables in a relational database. This could be done with a utility like pg_dump, or possibly some related tools. Another possibility might be to run a batch-based (and possibly proprietary) nightly ETL from multiple databases to a single system.

The PostgreSQL COPY command made this a bit less onerous, but typically still involved sending a large amount of data across the network.

Another approach that was commonly used involved a timestamp / sequence / status-column that gets updated. For example, a column, updated_timestamp, may be added to your table which is then read periodically to find the changed rows. The same thing can be done with an updated boolean column. These approaches work generally well enough (consider Confluent’s Kafka JDBC connector), but there may be limitations for noticing deletes or updates in some implementations.

Here’s how some of these approaches might look. First, the table, with updates:

CREATE TABLE source_table (
 id SERIAL PRIMARY KEY,
 important_data text NOT NULL,
 create_time TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
 update_time TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp(),
 updated BOOLEAN NOT NULL DEFAULT FALSE
);
 
ALTER TABLE source_table REPLICA IDENTITY FULL;
 
INSERT INTO source_table (important_data)
 VALUES ('first bit of very important analytics data');
 
INSERT INTO source_table (important_data)
 VALUES ('second bit of very important analytics data');

Next, the example query that differentiates by column, using some of these different approaches:

SELECT * FROM source_table
 WHERE ID >= 0
 ORDER BY id ASC LIMIT 1;
 
SELECT * FROM source_table
 WHERE timestamp >= y
 ORDER BY timestamp ASC LIMIT 1;
 
SELECT * FROM source_table
 WHERE updated IS FALSE
 ORDER BY id LIMIT 1;
UPDATE source_table SET updated = 't'
 
WHERE id = (SELECT id FROM source_table)
 WHERE updated IS FALSE
 ORDER BY id ASC LIMIT 1
 RETURNING*;

NOTE: The RETURNING keyword in PostgreSQL gives an opportunity to return from the INSERT or UPDATE statement the values of any columns after the insert or update was run.

Trigger-based approaches to Change Data Capture

With a trigger-based approach, you’d create change tables that contain rows that were INSERTed, UPDATEd, or DELETEd; in other words, those rows created with DML (data manipulation language).

Here’s how something like this would work:

CREATE TRIGGER store_changes AFTER UPDATE, INSERT, DELETE ON source_table FOR EACH ROW EXECUTE PROCEDURE store_change();

Subsequently, the trigger simply INSERTs the contents of the change to a change table while adding a column indicating whether it was an INSERT, UPDATE, or DELETE. These changes are read, row by row and applied serially — akin to a commit log — in another database.

Historically there have been many homespun solutions available for this but more mature open-source projects like Slony and PGQ (Londiste) are a few notable well-tested alternatives. Unfortunately there are some drawbacks: this sort of approach has generally speaking fairly bad performance as all writes to the database are multiplied what’s more, DDL (Data Definition Language) constructs such as ALTER TABLE or CREATE TABLE are, arguably, not handled gracefully.

In summary then, trigger-based CDC has the upside that it can be accomplished within the DB itself using SQL, but with the downside of a notable performance impact.

That’s trigger-based CDC, but what about CDC via transaction log?

Enter built-in Change Data Capture

In the database world, change data capture (CDC) is a set of software design patterns used to determine (and track) the data that has changed so that action can be taken using the changed data.

As it turns out, PostgreSQL already has built-in functionality for this. PostgreSQL maintains a WAL (Write-Ahead Log) of all the changes that are made. WALs are typically used for data recovery by replaying them to get back to a desired state by replaying WAL transactions.

The WAL, as it turns out, is the actual source of truth about the state of a PostgreSQL instance. You can think of the PostgreSQL tables as a queryable cache of the log.

Solving some of the problems: CDC via logical decoding

A modern approach allows PostgreSQL to keep track of all the database changes by decoding the WAL. PostgreSQL supports multiple different logical decoding output plugins that can output the data in different formats. This approach is by far the most performant of the ones we’ve covered so far - no need for double writes as these are avoided by using the WAL that PostgreSQL was already writing for crash recovery.

CDC via logical decoding can track all DML (INSERT, UPDATE, DELETE) changes and allows reading of a specific subset of changes. In fact, a unit of change is merely a committed row of data.

CDC via logical data decoding can be easily be turned on by setting the following parameters in postgresql.conf:

wal_level=logical
max_replication_slots = 10 # at least one
max_wal_sender = 10 # at least one

Then:

$ CREATE ROLE foo REPLICATION LOGIN;

It’s good to note though that, prior to PostgreSQL 10 you may also need some changes into your pg_hba.conf.

This sort of approach is supported by multiple PostgreSQL drivers (including JDBC, and Python psycopg2) and its use extends beyond CDC to replication and auditing. That having been said there are some notable shortcomings with this approach as well. For one, CDC via logical decoding cannot replicate DDL changes such as table column additions. Though there are workarounds for this like setting up event triggers to have your replication system run the associated DDL PostgreSQL doesn’t natively support this yet.

Also the available output plugins do not support all data types and losing a node isn’t handled gracefully as replication slots are not transferred over to standbys failover. A final limitation is that, a given change streams changes are limited to concerning a single logical database, though you can have as many parallel streams going on as you wish.

Let’ s look at some variants of this approach:

CDC with pg_recvlogical with wal2json

Developed by Euler Taveira de Oliviera and hosted on github, wal2json works by decoding logical changes into JSON format. While limited to the data types that JSON normally handles, wal2json is nonetheless supported by multiple DbaaS vendors, including Aiven and Amazon Web Services RDS. And wal2json works with the Apache Kafka Debezium plug-in, which we’ll discuss later.

pg_recvlogical is a command line tool enabling the receipt of logical changes. When pg_recvlogical is configured together with wal2json, all logical changes can be received and written to a file. While this is great for simple use-cases; there is one flaw: there is only a single receiver of data writing changes to a single file – if that file is lost, then your entire log of changes is also lost.

CDC, where changes are read within an application

As with the previous approach, there is just a single receiver of the data. Unlike before, instead of using a separate “consumer” of sorts, we’ll receive all logical changes from PostgreSQL directly to one or many applications. This makes it probably the simplest of these approaches allowing the transformation of the data in transit, as well as to act on the data in real time. However, as there is just a single receiver of data, the same bottleneck exists: this time, if the node hosting your application goes down, so does your capacity to process that change data.

So that’s CDC using a transaction log. The upsides?

  1. Minimal performance impact on the master database;
  2. No changes required to the application nor database tables.

For this reason many consider log-based CDC to be the approach of choice when compared to trigger-based CDC.

This approach may continue to present some challenges, e.g. resource locks or performance hits resulting from simultaneous reads of a single WAL. What then?

CDC, where a streaming platform is used

There’s a difference between having N applications directly consume data directly from PostgreSQL compared to using a streaming platform.

When every consumer point application separately touches the PostgreSQL instance, there’s likely going to be a performance hit on the PostgreSQL instance, which must still consume memory and CPU to maintain its state while also serving the WAL. Not to mention there may be issues when multiple applications attempt to access PostgreSQL’s WAL.

Contrast that with using Apache Kafka to handle serving data to the consumer applications: in this scenario, Apache Kafka producers send changes to Kafka, where decoupled consumer applications can consume those changes at their own pace.

In this approach, the data is first read from PostgreSQL and immediately written to a distributed streaming platform, like Apache Kafka.

As changes are written to a distributed system the change data get automatically replicated N times.

data as read from postgresql and written to kafka

This solves the problem of having a single point of failure that the other approaches have. In general, the use of a streaming platform allows an arbitrary number of readers to access the same data and enjoy easy post-processing.

Although while more complex processing is now easily possible, you’ve added another distributed system to the mix in this case which will increase the complexity of your overall solution quite a bit. Be sure that you require this sort of functionality before going this route.

Why Apache Kafka?

Apache Kafka is a distributed streaming platform that supports a publish/subscribe model of information transmission. By spreading data into topics, which are further split into partitions, it’s possible to enhance fault tolerance while write one to possibly many consumers – all while controlling consumers’ access to the topics themselves.

Apache Kafka is reliable, scalable and meant for streaming data. Apache Kafka, which is seen by many as a natural “message bus” comes with a huge ecosystem of tools to handle streaming data from different databases. Apache Kafka can work even better when enhanced specifically for CDC.

Introducing Debezium

Debezium is such an enhancement. It’s an abstraction layer, built atop Kafka, that allows your applications or messaging services to respond directly to inserts, updates, and deletes (DML) from your PostgreSQL instance. Debezium acts like a monitoring daemon which scans the target database for row-level changes, which are streamed in the order committed.

Debezium comes with many connectors, including Apache Kafka Connect; in addition it supports MySQL, MongoDB, PostgreSQL, Oracle, SQL Server and Cassandra. Debezium uses logical replication to replicate a stream of changes to a Kafka topic; as it uses log compaction, it need only keep the latest value if topics are pre-created. Debezium can run custom transformation code on the data as it is received and supports PostgreSQL’s builtin native pgoutput format, protobuf output plugin or wal2json, which we discussed earlier.

Debezium itself is frequently the basis for Kafka-centric architectures, so you don’t need to know ahead of time how, when or by whom the data will be consumed. As it works with Kafka Connect, there are no shortage of other connectors to send it to the next system or store. And, of course, it works with PostgreSQL changes in real time.

Some Debezium gotchas

Debezium support is much more mature with Apache Kafka Connect than with PostgreSQL, but the technology can now be used to connect the two.

It’s important to note that when PostgreSQL master failover occurs, the PostgreSQL replication slot disappears; this prompts the need to recreate state. Topics that are not pre-created use DELETE and not COMPACT as the cleanup policy, so this affects data persistence. And from the PostgreSQL side, one must remember to set REPLICA IDENTITY FULL to see UPDATE and DELETE changes.

Next, let’s look at how this approach can be enhanced with logical replication.

CDC and logical replication

Logical replication is a built-in feature to PostgreSQL. While it doesn’t allow for data transformations in transit, and it’s most common use case is for replicating data to larger data warehouses for analytics workloads, it’s useful for a number of other things as well.

Logical replication allows you to replicate all or a subset of your data. While limited to a single database per replication connection, and requiring superuser privileges, it can theoretically allow no-downtime migrations.

Recap and wrapping up

Logical decoding and replication have revolutionized the way CDC can be done with PostgreSQL. While we’re only seeing the beginnings of its adoption, logical decoding, for example, isn’t a perfect solution yet. However Apache Kafka and Debezium, which is built atop Kafka, are natural fits for capturing PostgreSQL database changes in real time and streaming the data.

Aiven for PostgreSQL provides built-in logical replication for change data capture, and we’re one of the few PostgreSQL cloud providers that currently does. We also support Apache Kafka Connect as a service, which offers a combination of approaches (including logical decoding, logical replication and Debezium) as a useful array of choices for replicating your row-level PostgreSQL changes to almost anywhere.

If you want to find out more about our PostgreSQL offering, you can find the pricing here. You can also follow our blog, changelog RSS feeds, or follow us on Twitter or LinkedIn.

Sources

  1. Change Data Capture pipeline from PostgreSQL to Kafka

  2. CDC for a brave New World: Hannu Valtonen presents at PostgresConf US 2018

  3. Debezium.io

Start your free 30 day trial today

Test the whole platform for 30 days with no ifs, ands, or buts.