Consistent Change Data Capture across multiple tables with PostgreSQL® logical decoding, the outbox pattern and Debezium

Follow along to implement the outbox pattern in PostgreSQL® and create a change data capture workflow that doesn't create duplicate data!

Change data capture across multiple tables with PostgreSQL® logical decoding messages and Debezium

Change data capture (CDC) is a widely adopted pattern to move data across systems. While the basic principle works well on small single table use-cases, things get complicated when we need to take into account consistency when information spans multiple tables. In cases like this, creating multiple 1-1 CDC flows is not enough to guarantee a consistent view of the data in the database because each table is tracked separately. Aligning data with transaction boundaries becomes a hard and error prone problem to be solve once the data left the database.

This tutorial shows how to use PostgreSQL® logical decoding, the outbox pattern and Debezium to propagate a consistent view of a dataset spanning over multiple tables.

Use case: A PostgreSQL based online shop

Relational databases are based on an entity-relationship model, where entities are stored in tables, with each table having a key for uniqueness. Relationships take the form of foreign keys, that allow information from various tables to be joined.

A practical example is the following with the three entities users, products, orders, and order lines and the relationships within them.

Architectural diagram of the entity-relationship model

In the above picture, the orders table contains a foreign key to users (the user making the order), and the order lines table contains the foreign keys to orders and products allowing to understand to which order the line belongs and which products it includes.

We can recreate the above situation by signing up for an Aiven account and accessing the console, then creating a new Aiven for PostgreSQL® database. When the service is up and running, we can retrieve the connection URI from the service console page's Overview tab.

When you have the connection URI, connect with psql and run the following:

CREATE TABLE USERS (ID SERIAL PRIMARY KEY, USERNAME TEXT); INSERT INTO USERS (USERNAME) VALUES ('Franco'),('Giuseppina'),('Wiltord'); CREATE TABLE ORDERS ( ID SERIAL PRIMARY KEY, SHIPPING_ADDR TEXT, ORDER_DATE DATE, USER_ID INT, CONSTRAINT FK_USER FOREIGN KEY(USER_ID) REFERENCES USERS(ID) ); INSERT INTO ORDERS (SHIPPING_ADDR, ORDER_DATE, USER_ID) VALUES ('Via Ugo 1', '02/08/2023',3), ('Piazza Carlo 2', '03/08/2023',1), ('Lincoln Street', '03/08/2023',2); CREATE TABLE PRODUCTS ( ID SERIAL PRIMARY KEY, CATEGORY TEXT, NAME TEXT, PRICE INT ); INSERT INTO PRODUCTS (CATEGORY, NAME, PRICE) VALUES ('t-shirt', 'red t-shirt',5), ('shoes', 'Wow shoe',35), ('t-shirt', 'blue t-shirt',15), ('dress', 'white-golden dress',50); CREATE TABLE ORDER_LINES ( ID SERIAL PRIMARY KEY, ORDER_ID INT, PROD_ID INT, QTY INT, CONSTRAINT FK_ORDER FOREIGN KEY(ORDER_ID) REFERENCES ORDERS(ID), CONSTRAINT FK_PRODUCT FOREIGN KEY(PROD_ID) REFERENCES PRODUCTS(ID) ); INSERT INTO ORDER_LINES (ORDER_ID, PROD_ID, QTY) VALUES (1,1,5), (1,4,1), (2,2,7), (2,4,2), (2,3,7), (2,1,1), (3,2,2);

Start the Change Data Capture flow with the Debezium connector

Now, if we want to send an event to Apache Kafka® every time a new order happens we can define a Debezium CDC connector that includes all four tables defined above.

To do this, navigate to the Aiven Console and create a new Aiven for Apache Kafka® service (we need at least a business plan for this example). Then enable Kafka Connect from the service overview page. Navigate to the bottom of the same page, we can and enable the kafka.auto_create_topics_enable configuration in the Advanced parameter section for our test purposes. Finally, when the service is up and running create a Debezium CDC connector with the following JSON definition:

{ "name": "mysourcedebezium", "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "<HOSTNAME>", "database.port": "<PORT>", "database.user": "avnadmin", "database.password": "<PASSWORD>", "database.dbname": "defaultdb", "database.server.name": "mydebprefix", "plugin.name": "pgoutput", "slot.name": "mydeb_slot", "publication.name": "mydeb_pub", "publication.autocreate.mode": "filtered", "table.include.list": "public.users,public.products,public.orders,public.order_lines" }

Where:

  • database.hostname, database.port, database.password are pointing to the Aiven for PostgreSQL connection parameters that can be found in the Aiven Console's service overview tab
  • database.server.name is the prefix for the topic names in Aiven for Apache Kafka
  • plugin.name is the PostgreSQL plugin name used pgoutput
  • slot.name and publication.name are the name of the replication slot and publication in PostgreSQL
  • "publication.autocreate.mode": "filtered" allows us to create a publication only for the tables in scope
  • table.include.list lists the tables for which we want to enable CDC

The connector will create four topics (one per table) and tracks the changes separately for each table.

Debezium to Kafka

In Aiven for Apache Kafka we should see four different topics named <prefix>.<schema_name>.<table_name> where:

  • <prefix> matches the database.server.name parameter (mydebprefix)
  • <schema_name> matches the name of the schema (public in our scenario)
  • <table_name> matches the name of the tables (users, products, orders, and order_lines)

If we check with kcat, the mydebprefix.public.users log in Apache Kafka, we should see data similar to the below

{"before":null,"after":{"id":1,"username":"Franco"},"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebprefix1","ts_ms":1690794104325,"snapshot":"true","db":"defaultdb","sequence":"[null,\"251723832\"]","schema":"public","table":"users","txId":2404,"lsn":251723832,"xmin":null},"op":"r","ts_ms":1690794104585,"transaction":null} {"before":null,"after":{"id":2,"username":"Giuseppina"},"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebprefix1","ts_ms":1690794104325,"snapshot":"true","db":"defaultdb","sequence":"[null,\"251723832\"]","schema":"public","table":"users","txId":2404,"lsn":251723832,"xmin":null},"op":"r","ts_ms":1690794104585,"transaction":null} {"before":null,"after":{"id":3,"username":"Wiltord"},"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebprefix1","ts_ms":1690794104325,"snapshot":"true","db":"defaultdb","sequence":"[null,\"251723832\"]","schema":"public","table":"users","txId":2404,"lsn":251723832,"xmin":null},"op":"r","ts_ms":1690794104585,"transaction":null}

The above is the typical Debezium data representation with the before and after representations, as well as information about the transactions (ts_ms as example) and the data source (schema, table and others). This rich information will be ueful later.

The consistency problem

Now let's say Franco, one of our users, decides to issue a new order for the white-golden dress. Just few seconds later, our company, due to an online debate decides that the white-golden dress is now called blue-black dress and wants to charge 65$$ instead of the 50$$ original price.

Entity relationship model with the updates to the order, order-lines and products table.

The above two actions can be represented by the following two transactions in PostgreSQL:

--- Franco purchasing the white-golden dress BEGIN; INSERT INTO ORDERS (SHIPPING_ADDR, ORDER_DATE, USER_ID) VALUES ('Piazza Carlo 2', '04/08/2023',1); INSERT INTO ORDER_LINES (ORDER_ID, PROD_ID, QTY) VALUES (4,4,1); END; --- Our company updating name and the price of the white-golden dress BEGIN; UPDATE PRODUCTS SET NAME = 'blue-black dress', PRICE = 65 WHERE ID = 4; END;

At all points in time we can get the order details with the following query:

SELECT USERNAME, ORDERS.ID ORDER_ID, PRODUCTS.NAME PRODUCT_NAME, PRODUCTS.PRICE PRODUCT_PRICE, ORDER_LINES.QTY QUANTITY FROM USERS JOIN ORDERS ON USERS.ID = ORDERS.USER_ID JOIN ORDER_LINES ON ORDERS.ID = ORDER_LINES.ORDER_ID JOIN PRODUCTS ON ORDER_LINES.PROD_ID = PRODUCTS.ID WHERE ORDERS.ID = 4;

If we issue the query just after Franco's order was inserted, but before the product update, this results in the correct order details:

username | order_id | product_name | product_price | quantity ----------+----------+--------------------+---------------+---------- Franco | 4 | white-golden dress | 50 | 1 (1 row)

If we issued the same query after the product update, this results in the blue-black dress being in the order and Franco being upcharged by extra `15$$.

username | order_id | product_name | product_price | quantity ----------+----------+------------------+---------------+---------- Franco | 4 | blue-black dress | 65 | 1 (1 row)

Recreate consistency in Apache Kafka

When we look at the data in Apache Kafka, we can see all the changes in the topics. Browsing the mydebprefix.public.order_lines topic with kcat, we can check the new entry (the results in mydebprefix.public.orders would be similar):

{"before":null,"after":{"id":8,"order_id":4,"prod_id":4,"qty":1},"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebprefix1","ts_ms":1690794206740,"snapshot":"false","db":"defaultdb","sequence":"[null,\"251744424\"]","schema":"public","table":"order_lines","txId":2468,"lsn":251744424,"xmin":null},"op":"c","ts_ms":1690794207231,"transaction":null}

And in mydebprefix.public.products, we can see an entry like the following, showcasing the update from white-golden dress to blue-black dress and related price change:

{"before":{"id":4,"category":"dress","name":"white-golden dress","price":50},"after":{"id":4,"category":"dress","name":"blue-black dress","price":65},"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebprefix1","ts_ms":1690794209729,"snapshot":"false","db":"defaultdb","sequence":"[\"251744720\",\"251744720\"]","schema":"public","table":"products","txId":2469,"lsn":251744720,"xmin":null},"op":"u","ts_ms":1690794210275,"transaction":null}

The question now is: How can we keep the order consistent with reality, where Franco purchased the white-golden dress for `50$$?

As mentioned before, the Debezium format stores lots of metadata in addition to the change data. We could make use of the transaction's metadata (txId, lsn and ts_ms for example) and additional tools like Aiven for Apache Flink® to recreate a consistent view of the transaction via stream processing. This solution requires additional tooling that might not be in scope for us, however.

Use the outbox pattern in PostgreSQL

An alternative solution that doesn't require additional tooling is to propagate a consistent view of the data using an outbox pattern built in PostgreSQL. With the outbox pattern we store, alongside the original set of tables, an additional table which consolidates the information. With this pattern we can update both the original table and the outbox one within a transaction.

Architectural diagram showcasing the outbox table

Add a new outbox table in PostgreSQL

How do we implement the outbox pattern in PostgreSQL? The first option is to add a new dedicated table and update it within the same transaction changing the ORDERS and ORDER_LINES tables. We can define the outbox table as follows:

CREATE TABLE ORDER_OUTBOX ( ORDER_LINE_ID INT, ORDER_ID INT, USERNAME TEXT, PRODUCT_NAME TEXT, PRODUCT_PRICE INT, QUANTITY INT );

We can then add the ORDER_OUTBOX table in the table.include.list parameter for the Debezium Connector to track its changes. The last part of the equation is to update the outbox table at every order: if Giuseppina wants 5 red t-shirts, the transaction will need to change the ORDERS, ORDER_LINES and ORDER_OUTBOX tables like the following:

BEGIN; INSERT INTO ORDERS (ID, SHIPPING_ADDR, ORDER_DATE, USER_ID) VALUES (5, 'Lincoln Street', '05/08/2023',2); INSERT INTO ORDER_LINES (ORDER_ID, PROD_ID, QTY) VALUES (5,1,5); INSERT INTO ORDER_OUTBOX SELECT ORDER_LINES.ID, ORDERS.ID, USERNAME, NAME PRODUCT_NAME, PRICE PRODUCT_PRICE, QTY QUANTITY FROM USERS JOIN ORDERS ON USERS.ID = ORDERS.USER_ID JOIN ORDER_LINES ON ORDERS.ID = ORDER_LINES.ORDER_ID JOIN PRODUCTS ON ORDER_LINES.PROD_ID = PRODUCTS.ID WHERE ORDERS.ID=5; END;

With this transaction and the Debezium configuration change to include the public.order_outbox table in the CDC, we end up with a new topic called mydebprefix.public.order_outbox. It has the following data, which represents the consistent situation in PostgreSQL:

{"before":null,"after":{"order_line_id":12,"order_id":5,"username":"Giuseppina","product_name":"red t-shirt","product_price":5,"quantity":5},"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebprefix1","ts_ms":1690798353655,"snapshot":"false","db":"defaultdb","sequence":"[\"251744920\",\"486544200\"]","schema":"public","table":"order_outbox","txId":4974,"lsn":486544200,"xmin":null},"op":"c","ts_ms":1690798354274,"transaction":null}

This approach emits a new entry for every order line. We could also aggregate the outbox table at order level by, for example, adding the order lines information in a nested JSONB object.

Avoid the additional table with PostgreSQL logical decoding

The main problem with the outbox table approach is that we're storing the same information twice: once in the original tables and once in the outbox table. This doubles the storage needs, and the original applications that use the database generally not access it, making this an inefficient approach.

A better, transactional approach, is to use PostgreSQL logical decoding. Created originally for replication purposes, PostgreSQL logical decoding can also write custom information to the WAL log. Instead of re-storing the result of the joined data in another PostgreSQL table, we can emit the result as an entry to the WAL log. By doing it within a transaction, we can benefit from the transaction isolation therefore the entry in the log is committed only if the whole transaction is.

Architecture diagram with PostgreSQL logical decoding messages

To use PostgreSQL logical decoding messages for our outbox pattern needs, we need to execute the following:

BEGIN; DO $$ DECLARE JSON_ORDER text; begin INSERT INTO ORDERS (ID, SHIPPING_ADDR, ORDER_DATE, USER_ID) VALUES (6, 'Via Ugo 1', '05/08/2023',3); INSERT INTO ORDER_LINES (ORDER_ID, PROD_ID, QTY) VALUES (6,4,2),(6,3,3); SELECT JSONB_BUILD_OBJECT( 'order_id', ORDERS.ID, 'order_lines', JSONB_AGG( JSONB_BUILD_OBJECT( 'order_line', ORDER_LINES.ID, 'username', USERNAME, 'product_name', NAME, 'product_price',PRICE, 'quantity', QTY))) INTO JSON_ORDER FROM USERS JOIN ORDERS ON USERS.ID = ORDERS.USER_ID JOIN ORDER_LINES ON ORDERS.ID = ORDER_LINES.ORDER_ID JOIN PRODUCTS ON ORDER_LINES.PROD_ID = PRODUCTS.ID WHERE ORDERS.ID=6 GROUP BY ORDERS.ID; SELECT * FROM pg_logical_emit_message(true,'myprefix',JSON_ORDER) into JSON_ORDER; END; $$; END;

Where:

  • The two lines below insert the new order into the original tables
INSERT INTO ORDERS (ID, SHIPPING_ADDR, ORDER_DATE, USER_ID) VALUES (6, 'Via Ugo 1', '05/08/2023',3); INSERT INTO ORDER_LINES (ORDER_ID, PROD_ID, QTY) VALUES (6,4,2),(6,3,3);

Next, we need to construct a SELECT that:

  • Gets the new order details from the source tables
  • Creates a unique JSON document (stored in the JSON_ORDER variable) for the entire order and stores the results in an array for each line of the order
  • Emits this as a logical message to the WAL file.

The SELECT statement looks like the following:

SELECT * FROM pg_logical_emit_message(true,'outbox',JSON_ORDER) into JSON_ORDER;

pg_logical_emit_message has three arguments. The first, true, defines this operation as a part of a transaction. myprefix defines the message prefix, and JSON_ORDER is the content of the message.

The emitted JSON document should look similar to:

{"order_id": 6, "order_lines": [{"quantity": 2, "username": "Wiltord", "order_line": 19, "product_name": "blue-black dress", "product_price": 65}, {"quantity": 3, "username": "Wiltord", "order_line": 20, "product_name": "blue t-shirt", "product_price": 15}]}

If the above transaction is successful, we should see a new topic named mydebprefix.message that contains the logical message that we just pushed, the form should be the following:

{"op":"m","ts_ms":1690804437953,"source":{"version":"1.9.7.aiven","connector":"postgresql","name":"mydebmsg","ts_ms":1690804437778,"snapshot":"false","db":"defaultdb","sequence":"[\"822085608\",\"822089728\"]","schema":"","table":"","txId":8651,"lsn":822089728,"xmin":null},"message":{"prefix":"myprefix","content":"eyJvcmRlcl9pZCI6IDYsICJvcmRlcl9saW5lcyI6IFt7InF1YW50aXR5IjogMiwgInVzZXJuYW1lIjogIldpbHRvcmQiLCAib3JkZXJfbGluZSI6IDI1LCAicHJvZHVjdF9uYW1lIjogImJsdWUtYmxhY2sgZHJlc3MiLCAicHJvZHVjdF9wcmljZSI6IDY1fSwgeyJxdWFudGl0eSI6IDMsICJ1c2VybmFtZSI6ICJXaWx0b3JkIiwgIm9yZGVyX2xpbmUiOiAyNiwgInByb2R1Y3RfbmFtZSI6ICJibHVlIHQtc2hpcnQiLCAicHJvZHVjdF9wcmljZSI6IDE1fV19"}}

Where:

  • "op":"m" defines that the event is a logical decoding message
  • "prefix":"myprefix" is the prefix we defined in the pg_logical_emit_message call
  • content contains the JSON document with the order details encoded based on the binary.handling.mode defined in the connector definition.

If we use a mix of kcat and jq to showcase the data included in the message.content part of the payload with:

kcat -b KAFKA_HOST:KAFKA_PORT \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t mydebmsg.message -u | jq -r '.message.content | @base64d'

We see the message in JSON format as:

{"order_id": 6, "order_lines": [{"quantity": 2, "username": "Wiltord", "order_line": 25, "product_name": "blue-black dress", "product_price": 65}, {"quantity": 3, "username": "Wiltord", "order_line": 26, "product_name": "blue t-shirt", "product_price": 15}]}

Conclusion

Defining a change data capture system allows downstream technologies to make use of the information assets is useful only if we can provide a consistent view on top of the data. The outbox pattern allows us to join data spanning different tables and provide a consistent, up to date view of complex queries.

PostgreSQL's logical decoding enables us to push such consistent view to Apache Kafka without having to write changes into an extra outbox table but rather by writing directly to the WAL log.

Further reading