Change data capture across multiple tables with PostgreSQL® logical decoding messages and Debezium
Change data capture (CDC) is a widely adopted pattern to move data across systems. While the basic principle works well on small single table use-cases, things get complicated when we need to take into account consistency when information spans multiple tables. In cases like this, creating multiple 1-1 CDC flows is not enough to guarantee a consistent view of the data in the database because each table is tracked separately. Aligning data with transaction boundaries becomes a hard and error prone problem to be solve once the data left the database.
This tutorial shows how to use PostgreSQL® logical decoding, the outbox pattern and Debezium to propagate a consistent view of a dataset spanning over multiple tables.
Debezium 2.5
This article describes the configuration for Debezium version 2.5 and later.
Use case: A PostgreSQL based online shop
Relational databases are based on an entity-relationship model, where entities are stored in tables, with each table having a key for uniqueness. Relationships take the form of foreign keys, that allow information from various tables to be joined.
A practical example is the following with the three entities users, products, orders, and order lines and the relationships within them.
In the above picture, the orders table contains a foreign key to users (the user making the order), and the order lines table contains the foreign keys to orders and products allowing to understand to which order the line belongs and which products it includes.
When you have the connection URI, connect with psql and run the following:
Loading code...
Loading code...
Loading code...
Loading code...
Start the Change Data Capture flow with the Debezium connector
Now, if we want to send an event to Apache Kafka® every time a new order happens we can define a Debezium CDC connector that includes all four tables defined above.
To do this, navigate to the Aiven Console and create a new Aiven for Apache Kafka® service (we need at least a Business plan for this example, so that we can run a Kafka Connector). Then
Enable Kafka Connect from the Connectors tab of the service overview page.
Navigate to the bottom of the Service Settings tab and enable the kafka.auto_create_topics_enable configuration in the Advanced parameter section - this is not something we'd normally do in production, but it makes sense for our test purposes.
Finally, when the service is up and running create a Debezium CDC connector with the following JSON definition:
Loading code...
Where:
database.hostname, database.port, database.password specify the Aiven for PostgreSQL connection parameters that can be found in the Aiven Console's service overview tab
topic.prefix is the prefix for the topic names in Aiven for Apache Kafka
plugin.name is the PostgreSQL plugin to use, pgoutput
slot.name and publication.name are the name of the replication slot and publication in PostgreSQL
"publication.autocreate.mode": "filtered" allows us to create a publication only for the tables in scope
table.include.list lists the tables for which we want to enable CDC
The connector will create four topics (one per table) and tracks the changes separately for each table.
In Aiven for Apache Kafka we should see four different topics named <prefix>.<schema_name>.<table_name> where:
<prefix> matches the database.server.name parameter (mydebprefix)
<schema_name> matches the name of the schema (public in our scenario)
<table_name> matches the name of the tables (users, products, orders, and order_lines)
If we check with kcat, the mydebprefix.public.users log in Apache Kafka, we should see data similar to the below
Loading code...
The above is the typical Debezium data representation with the before and after representations, as well as information about the transactions (ts_ms as example) and the data source (schema, table and others). This rich information will be useful later.
The consistency problem
Now let's say Franco, one of our users, decides to issue a new order for the white-golden dress. Just a few seconds later, our company, due to an online debate decides that the white-golden dress is now called blue-black dress and wants to charge 65$$ instead of the 50$$ original price.
.
The first action can be represented by the following transaction in PostgreSQL:
Loading code...
After which we can check the order details with the following query:
Loading code...
This should report the correct order details:
Loading code...
The second action can be represented by the following transaction:
Loading code...
We can then repeat the query to find out the order details:
Loading code...
The result now shows the order as Franco ordering a blue-black dress with an extra $15 cost.
Loading code...
Recreate consistency in Apache Kafka
When we look at the data in Apache Kafka, we can see all the changes in the topics. Browsing the mydebprefix.public.order_lines topic with kcat, we can check the new entry (the results in mydebprefix.public.orders would be similar):
Loading code...
And in mydebprefix.public.products, we can see entries like the following, showcasing the update from white-golden dress to blue-black dress and related price change:
Loading code...
The question now is: How can we keep the order consistent with reality, where Franco purchased the white-golden dress for $50?
As mentioned before, the Debezium format stores lots of metadata in addition to the change data. We could make use of the transaction's metadata (txId, lsn and ts_ms for example) and additional tools like Aiven for Apache Flink® to recreate a consistent view of the transaction via stream processing. That solution requires additional tooling that might not be in scope for us, however.
Use the outbox pattern in PostgreSQL
An alternative solution that doesn't require additional tooling is to propagate a consistent view of the data using an outbox pattern built in PostgreSQL. With the outbox pattern we store, alongside the original set of tables, an additional table which consolidates the information. With this pattern we can update both the original table and the outbox one within a transaction.
Add a new outbox table in PostgreSQL
How do we implement the outbox pattern in PostgreSQL? The first option is to add a new dedicated table and update it within the same transaction changing the ORDERS and ORDER_LINES tables. We can define the outbox table as follows:
Loading code...
We can then add the ORDER_OUTBOX table in the table.include.list parameter for the Debezium Connector to track its changes. The last part of the equation is to update the outbox table at every order: if Giuseppina wants 5 red t-shirts, the transaction will need to change the ORDERS, ORDER_LINES and ORDER_OUTBOX tables like the following:
Loading code...
With this transaction and the Debezium configuration change to include the public.order_outbox table in the CDC, we end up with a new topic called mydebprefix.public.order_outbox. It has the following data, which represents the consistent situation in PostgreSQL:
Loading code...
This approach emits a new entry for every order line. We could also aggregate the outbox table at order level by, for example, adding the order lines information in a nested JSONB object.
Avoid the additional table with PostgreSQL logical decoding
The main problem with the outbox table approach is that we're storing the same information twice: once in the original tables and once in the outbox table. This doubles the storage needs, and the original applications that use the database generally not access it, making this an inefficient approach.
A better, transactional approach, is to use PostgreSQL logical decoding. Created originally for replication purposes, PostgreSQL logical decoding can also write custom information to the WAL log. Instead of re-storing the result of the joined data in another PostgreSQL table, we can emit the result as an entry to the WAL log. By doing it within a transaction, we can benefit from the transaction isolation therefore the entry in the log is committed only if the whole transaction is.
To use PostgreSQL logical decoding messages for our outbox pattern needs, we need to execute the following:
Loading code...
In the above:
First we have two lines to insert the new order into the original tables
Loading code...
Next, we use SELECT and JSONB_BUILD_OBJECT to:
get the new order details from the source tables
create a unique JSON document (stored in the JSON_ORDER variable) for the entire order and store the results in an array for each line of the order
Finally, we need a SELECT statement to emit that JSON_ORDER variable as a logical message to the WAL file:
Loading code...
pg_logical_emit_message has three arguments. The first, true, defines this operation as a part of a transaction. myprefix defines the message prefix, and JSON_ORDER is the content of the message.
The emitted JSON document should look similar to:
Loading code...
If the above transaction is successful, we should see a new topic named mydebprefix.message that contains the logical message that we just pushed, the form should be the following:
Loading code...
Where:
"op":"m" defines that the event is a logical decoding message
"prefix":"myprefix" is the prefix we defined in the pg_logical_emit_message call
content contains the JSON document with the order details encoded based on the binary.handling.mode defined in the connector definition.
If we use a mix of kcat and jq to showcase the data included in the message.content part of the payload with:
Loading code...
We see the message in JSON format as:
Loading code...
Conclusion
Defining a change data capture system allows downstream technologies to make use of the information assets is useful only if we can provide a consistent view on top of the data. The outbox pattern allows us to join data spanning different tables and provide a consistent, up to date view of complex queries.
PostgreSQL's logical decoding enables us to push such consistent view to Apache Kafka without having to write changes into an extra outbox table but rather by writing directly to the WAL log.