Add caching to your PostgreSQL® app using Valkey™
Caching is used to speed up cloud applications, particularly for database reads. Read on to learn more, and find out how to build caching with Valkey™ into a simple PostgreSQL® web app.
What we'll learn
- How to write a simple Python web application that accesses data in a PostgreSQL® database
- How to use
curl
at the command line to make GET and POST requests to that web application - Why caching the GET response is a good idea, and how to do that with Valkey™, a key-value store using the Redis serialization protocol
- Some basics about cache invalidation - how to make sure the cache doesn't get out-of-date
Overview
With any sufficiently complex application, performance becomes a primary concern for optimization. One of the key metrics for measuring performance of any software is the speed of reading and writing from a database.
Most applications repeatedly store (write) and retrieve (read) some kind of data. However in most cases, the number of reads far exceeds the number of writes. To make this more efficient, we used a caching layer. Caching is the act of writing to a location (for instance, a block of memory) specifically designed for quick retrieval of common requests.
For example, imagine a customer creating a profile on an online store. The customer fills out their name, phone number, and address, which we store to our database. Because these pieces of information are required at multiple points in the checkout process, it's worth storing them in a cache the first time we retrieve them from the database. This speeds up application processing time for all following retrievals. The difference in retrieval speed for any single request might be mere milliseconds, but when we develop applications for the cloud, for millions of users simultaneously, those milliseconds add up.
Caching cloud applications introduces some complexities but also offers opportunity for optimization. Instead of using a block of memory (and the unbounded, chaotic nature that entails), we can use two databases! We can use one database as a data store, and one as a cache. Further, we can choose a database for our data store that is optimized for things like concurrency control and one for our cache optiized for speedy reads and writes, while still taking advantage of everything the cloud offers us in terms of scalability.
Prerequisites
We're going to be writing a web application in Python, and you'll need at least Python 3.9.
-
We'll use the following frameworks:
fastapi
(https://fastapi.tiangolo.com/) for writing our web application,uvicorn
(https://www.uvicorn.org/) to run that web applicationpsycopg2
(https://www.psycopg.org/) for talking to PostgreSQL®valkey-py
(https://github.com/valkey-io/valkey-py) for talking to Valkey™
-
CLI tooling:
-
A Valkey database and a PostgreSQL database:
- Setting up and managing a database, so we'll use Aiven for PostgreSQL® and Aiven for Valkey™ in this tutorial. You can sign up for our free tier to follow along!
- If you're following along without using Aiven, we still recommend deploying to a cloud provider like AWS or Google Cloud. This tutorial assumes the databases will be configured and deployed for you like Aiven does.
Set up a Python virtual environment
We'll do our development at the command line in a Python virtual environment. This will prevent any of the work we're doing in this tutorial from affecting anything else you might be working on.
First, let's set up the Python virtual environment:
python3 -m venv venv source venv/bin/activate
and then install the Python libraries we described above:
pip install fastapi uvicorn pip install psycopg2-binary pip install 'valkey[hiredis]' pip install 'python-dotenv'
The psycopg2
documentation recommends installing psycopg2-binary
for development and testing, because installing the psycopg2
package needs a C compiler and some other dependencies. For production it still recommends building the package yourself.
We're specifying valkey[hiredis]
rather than just valkey
, again as recommended by the package documentation, because it provides performance improvements. The single quotes stop the shell from seeing the [
and ]
as special characters.
You can quickly check all of those are installed correctly by starting up Python:
python3
and then at the >>>
prompt enter the following:
import fastapi import uvicorn import psycopg2 import valkey import dotenv
If you don't get any errors from those, then you're good to go. Exit the Python shell by typing:
exit()
If you're on Unix/Mac you can use CTRL-D
instead.
You can also type pip list
at the terminal prompt to get a list of all the Python packages installed in this virtual environment, including packages needed by the ones you asked for.
Install cURL
curl
is a command line tool to "do Internet transfers for resources specified as URLs using Internet protocols", and we shall use it in this tutorial to allow us to GET from and POST to our web application, as an alternative to using a web browser.
The Install cURL page in the Everything cURL book describes how to install it on Linux, Windows, macOS and using Docker.
In many operating systems, cURL is already installed. To check if it is, try calling it:
curl --help
Install the application specific tools
psql
Install The PostgreSQL Tutorial Getting started documentation explains how to install PostgreSQL on your local machine. This should also provide psql
, a command line tool for talking to a PostgreSQL service.
If you're using a Mac and Homebrew then you can type the following in a terminal to install libpq
and its associated tools, which include psql
:
brew install libpq
The output of that command will tell you how to make the libpq
commands available on your path (it will say something like If you need to have libpq first in your PATH, run: ...
). To get it set up temporarily in the current Bash session, we can do:
export PATH=/opt/homebrew/opt/libpq/bin:$PATH
psql
, and we don't actually need to install the PostgreSQL service for this tutorial.Install the Aiven command line tool
Aiven also provides a command line tool for working with its services, and it provides a way to run the service-specific command line tool for PostgreSQL or Valkey (respectively, psql
or valkey-cli
). See Get things done with the Aiven CLI for a bit more background, and the avn service cli
documentation for some specifics.
We can install the Aiven CLI using pip
, still in our virtual environment:
pip install aiven-client
You'll need to login using a token. Follow the instructions at Create an authentication token to get a token, and then login at the terminal using the following command, where YOUR_EMAIL_ADDRESS
is the email address you used to login to the Aiven Console. That will prompt you for the token that you copied.
avn user login --token YOUR_EMAIL_ADDRESS
Then specify the project you want. YOUR_PROJECT_NAME
will be the name of the project that you used when getting the token.
avn project switch YOUR_PROJECT_NAME
avn
and when to use psql
When to use You'll notice we specified using two command line tools in this tutorial. Let's talk about when to use which.
avn
is Aiven's own CLI tool. Use it if you're following along with Aiven services. avn
has the advantage that once you've logged in with a token, you just need the service name to connect, rather than the whole service URI.
psql
is useful to know because it's transferrable to any platform, not just Aiven, and lets us query some more specific things about PostgreSQL. If you aren't using Aiven, use psql
.
Create an Aiven for PostgreSQL® service
Next, let's create a PostgreSQL instance.
If you're using Aiven, you can also use the command line to do this. In this tutorial, we'll use the Aiven console.
Navigate to the Aiven console. Create an account if you haven't already, or log in if you have. If you're new to Aiven, then you can use the free plan for this tutorial, or create a free trial if you prefer.
Click Create service and create an Aiven for PostgreSQL® service.
If you're using the Free plan:
- Service type: PostgreSQL®
- Cloud provider: AWS
- Service cloud region: Choose the region closest to you that supports the free plan.
- Service plan: Choose Free, which is OK for this tutorial.
- Service name: Choose something meaningful - we're using
pg-app-backend
or for a paid service:
- Service type: PostgreSQL®
- Cloud provider: Choose the cloud provider of your choice.
- Service cloud region: Choose the region closest to you
- Service plan: Choose Hobbyist or Startup (Hobbyist is OK for this tutorial)
- Service name: Choose something meaningful - we're using
pg-app-backend
When you're ready, click Create service.
This initializes a PostgreSQL® database for use on the cloud and region you choose, with a small service plan (if you were building a real application, you'd want to pick a larger plan).
Save the PostgreSQL connection information
When Aiven is done initializing your PostgreSQL service, it will direct you to the service's Overview page.
You can return to this page any time using the Services menu on the left hand menu and selecting the service you want to view. You can also use the Quick connect button to get convenient copy-and-paste commands and code snippets for a variety of CLI tools and programming connections!
Throughout this tutorial, we'll export common variables, like the PostgreSQL service URI as shell variables. This avoids commiting potentially sensitive data to a version control system like git.
Copy the Service URI from the service page, and at the terminal prompt, set an environment variable to that string. For instance, if you're using the Bash shell:
export PG_SERVICE_URI='<the Service URI>'
Remember the single quotes, because the Service URI string probably has a question mark in it.
Put some data into the database
Download the dellstore2-normal-1.0.tar.gz
file from the PostgreSQL website.
Unpack it - for instance at the terminal you can use:
tar -xf dellstore2-normal-1.0.tar.gz
Navigate to the dellstore2-normal-1.0 folder on your terminal:
cd dellstore2-normal-1.0
Next, we want to start the PostgreSQL command line tool to talk to PostgreSQL. If you've installed psql
then you can do that using the Service URI you saved earlier:
psql $PG_SERVICE_URI
or you can use the avn
tool, which needs to know the service name:
avn service cli pg-app-backend
In either case, that should leave you talking to the psql
prompt, which will say something like:
psql (14.7 (Homebrew), server 14.6) SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) Type "help" for help. defaultdb=>
Create a new database, dellstore
:
CREATE DATABASE dellstore;
Then change to the correct database:
\c dellstore
dellstore
database, the prompt should change to dellstore=>
.Import the data by typing:
\i dellstore2-normal-1.0.sql
Then check what objects have been created in the database with:
\d
The output should look like this:
List of relations Schema | Name | Type | Owner --------+--------------------------+----------+---------- public | categories | table | avnadmin public | categories_category_seq | sequence | avnadmin public | cust_hist | table | avnadmin public | customers | table | avnadmin public | customers_customerid_seq | sequence | avnadmin public | inventory | table | avnadmin public | orderlines | table | avnadmin public | orders | table | avnadmin public | orders_orderid_seq | sequence | avnadmin public | products | table | avnadmin public | products_prod_id_seq | sequence | avnadmin public | reorder | table | avnadmin (12 rows)
Exist psql
by typing \q
, and leave the dellstore2-normal-1.0
directory
by typing:
cd ..
Create a simple web application
The following is a shortened version of the first example from the FastAPI documentation.
Create a file called main.py
that contains:
from fastapi import FastAPI app = FastAPI() @app.get("/count") def read_count(): return {"count": 0}
Then run it in a terminal using uvicorn
:
uvicorn main:app --reload
Which should say something like:
INFO: Will watch for changes in these directories: ['/Users/tony.ibbs/sw/aiven/pg-redis-tutorial'] INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) INFO: Started reloader process [75284] using StatReload INFO: Started server process [75286] INFO: Waiting for application startup. INFO: Application startup complete.
As that suggests, the --reload
switch means that when the source code for main.py
changes, it will automatically be reloaded, so the change will take effect without needing to restart the web server. This is nice because it means you don't have to stop and start the server every time you save a change to the source code.
We recommend keeping this terminal window open and leaving uvicorn
running through the rest of the tutorial.
Go to http://127.0.0.1:8000/count
in your web browser. You should see:
{"count":0}
Then, at the command line in a different terminal window, type
curl http://127.0.0.1:8000/count
The response should look like:
{"count":0}
Make it talk to the PostgreSQL database
Next, we'll change the read_count
function so that it actually performs a COUNT
of records in the database.
This is intentionally a slow operation, as COUNT
enumerates through the entire database, record by record. In most production use cases, you wouldn't do this too often.
In real life, a better example of a slow query would be calculating the checkout price of a basket on an ecommerce site. This needs a lot of database access, which takes time, and we don't want to recalculate everything just because the customer hit "refresh".
Change the import statements at the start of the file to bring in the extra libraries we need:
import os from fastapi import FastAPI from fastapi import HTTPException import psycopg2
We're going to need the PostgreSQL connection information, so we'll get that from the environment variable we set earlier on:
PG_SERVICE_URI = os.getenv('PG_SERVICE_URI')
As we said before, it's a good idea not to hard-code "secrets" into source code, especially if you're going to save it to GitHub or somewhere else public. Since the PG_SERVICE_URI
includes all the access information for the database, that definitely applies here. Using an environment variable means that we don't need to alter the code when the connection data changes - the same code can work for a local PostgreSQL and one running in the cloud.
Aiven partners with GitHub in it's secrets scanning program to prevent this for Aiven deployments, but it's a good idea to check manually anyway.
We keep the startup of FastApi itself:
app = FastAPI()
Then, add a function that connects to the Postgres database. Double check that the environment variable was set, and complain if it wasn't!
def connect_to_pg(): """Connect to the PostgreSQL backend.""" if not PG_SERVICE_URI: raise HTTPException( status_code=500, detail="Internal server error. Database not specified - " "environment variable PG_SERVICE_URI is empty/unset", ) try: # Use the given service URI, but specify a different database return psycopg2.connect(PG_SERVICE_URI, database='dellstore') except Exception as e: raise HTTPException( status_code=500, detail=f"Error connecting to database: {e.__class__.__name__} {e}", )
SERVICE_URI
specifies how to connect to the defaultdb
database, and our psycopg2.connect
call then actually asks for the dellstore
database. In a real application, we'd probably specify the database name with another environment variable.Next we can change the read_count
method to ask for how many orders there are in our database (you can see a brief introduction to how to use psycopg2
to make SQL queries in its documentation, at Basic module usage):
@app.get("/count") def read_count(): conn = connect_to_pg() try: cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM orders;') count = cursor.fetchone()[0] except Exception as e: raise HTTPException( status_code=500, detail=f"Error querying database: {e.__class__.__name__} {e}", ) finally: conn.close() return {'count': count}
Let's test this first working version of the application.
If the uvicorn
command is still running at your terminal, you should see it reload the new version of the application. Otherwise, start the application again:
uvicorn main:app --reload
and run the curl
command again, in the other terminal:
curl http://127.0.0.1:8000/count
We should see an answer for the total number of orders in the database:
{"count":12000}
Why do we want caching?
When you run main.py
as shown above, the code executes the COUNT
every time the read_count()
method is called – every time a GET query is made. We don't store the result anywhere, and thus we need to perform this expensive, slow operation every time.
Using Redis as a cache solves this: we run one Redis instance that all our backends can access. This lets us store the results of read_count()
outside our code and our PostgreSQL database. If we're running a modern cloud-based application, other copies of our backend can access the results of read_count()
, and we don't need to run expensive functions as often. Because the data Redis stores is not being read or written to disk, we can achieve very low latencies.
Create an Aiven for Valkey service
Next, let's create an Aiven for Valkey service. As we said, Valkey will serve as our caching layer.
In most cases, you'll want to deploy Valkey to the same region as your PostgreSQL database, as this reduces latency.
Go back to the Aiven console. If you're still on the PostgreSQL service page, select Services from the navigation sidebar to get back to the "Current services" page.
Click Create service and create an Aiven for Valkey service. If you're chose the free plan when creating the PostgresSQL service, then that probably makes sense for Valkey as well.
If you're using the Free plan:
- Service type: Valkey™
- Cloud provider: AWS
- Service cloud region: Choose the same region as your PostgreSQL database.
- Service plan: Choose Free, which is OK for this tutorial.
- Service name: Choose something meaningful - we're using
valkey-app-cache
or for a paid service:
- Service type: Valkey™
- Cloud provider: Choose the same cloud provider as for your PostgreSQL database.
- Service cloud region: Choose the same region as your PostgreSQL database.
- Service plan: Choose Hobbyist or Startup (Hobbyist is OK for this tutorial)
- Service name: Choose something meaningful - we're using
valkey-app-cache
When you're ready, click Create service. As before, if you were building a real application, you'd want to pick a larger plan.
Save the Valkey connection information
When Aiven is done spinning up your Valkey database, make a note of the Valkey Service URI.
Export this as an environment variable, as we did with PostgreSQL.
First, kill the uvicorn
process, if it's still running, and then, if you're
using Bash, type:
export VALKEY_SERVICE_URI='<the Valkey Service URI>'
Reminder on environment variables
As we're using more than one terminal window, it's easy to forget to set the PostgreSQL and Valkey environment variables (see Save the PostgreSQL connection information earlier, and Save the Valkey connection information just before this). In summary:
- Before running
uvicorn
, you should make sure thatPG_SERVICE_URI
andVALKEY_SERVICE_URI
are both set, as the Python application needs both of them. - Before running
psql $PG_SERVICE_URI
, you should make sure thatPG_SERVICE_URI
is set (that sounds obvious, but I've made the mistake myself).
Connect to Valkey and cache the GET method
Now let's add caching for the read_count
function into main py
.
First, we add an import for the Valkey library. We'll also import the logging package, so we can output messages to the terminal where we're running the application to explain what we're doing with the cache.
Change the start of main.py
(everything up to the app = FastAPI()
) to look like the following.
import os from fastapi import FastAPI from fastapi import HTTPException import logging import psycopg2 import valkey # turn on logging logging.basicConfig(level=logging.DEBUG) # import environment variables PG_SERVICE_URI = os.getenv('PG_SERVICE_URI') VALKEY_SERVICE_URI = os.getenv('VALKEY_SERVICE_URI') app = FastAPI()
This has added:
- import statements for the
redis
andlogging
packages. - a mapping for the environment variable we created earlier,
VALKEY_SERVICE_URI
- a statement to turn logging on
Now, after the connect_to_pg
function, add a new connect_to_valkey()
function, which will:
- Take the
VALKEY_SERVICE_URI
variable and pass it to theredis.from_url()
function to return an instance of Valkey for us. - Catch any errors we make along the way.
The new function looks like this:
def connect_to_valkey(): """Connect to the Valkey backend.""" if not VALKEY_SERVICE_URI: raise HTTPException( status_code=500, detail="Internal server error. Valkey service not specified - " "environment variable VALKEY_SERVICE_URI is empty/unset", ) try: return valkey.from_url(VALKEY_SERVICE_URI) except Exception as e: raise HTTPException( status_code=500, detail=f"Error connecting to Valkey: {e.__class__.__name__} {e}", )
Finally, modify the read_count()
function to add values to Valkey when called. We do this by:
- Connecting to Valkey using our
connect_to_valkey()
function. - Creating a variable to store our cache key in,
orders:count
. It's a good idea to name your cache keys something meaningful. - Looking in Valkey for any values stored under the
orders:count
key.- If a value is found in Valkey, we return the value and exit the function.
- If not, we connect to PostgresSQL and run the
SELECT COUNT (*) FROM orders;
statement, add that value to the Valkey cache, and return the value before exiting.
The read_count()
function should now look like:
@app.get("/count") def read_count(): cache_key = 'orders:count' valkey_conn = connect_to_valkey() # Is it already in the cache? If so, just use it count = valkey_conn.get(cache_key) if count: logging.debug(f'Found {cache_key}, value is {count}') return {'count': count} pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() cursor.execute('SELECT COUNT(*) FROM orders;') count = cursor.fetchone()[0] except Exception as e: raise HTTPException( status_code=500, detail=f"Error querying database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() # Remember to add it to the cache logging.debug(f'Caching {cache_key}, value is {count}') valkey_conn.set(cache_key, count) return {'count': count}
Let's test the changes to our app.
Start the application with uvicorn
again (remember you'll need the PG_SERVICE_URI
and VALKEY_SERVICE_URI
environment variables set - see Save the PostgreSQL connection information and Save the Valkey connection information).
uvicorn main:app --reload
and run our curl
command in the other terminal:
curl http://127.0.0.1:8000/count
We should get back the result we expect, just as before:
{"count":12000}
but now the uvicorn
output should say:
DEBUG:root:Caching orders:count, value is 12000 INFO: 127.0.0.1:51584 - "GET /count HTTP/1.1" 200 OK
If we run the same curl
command again
curl http://127.0.0.1:8000/count
We should get the same output
{"count":"12000"}
but we can see from the uvicorn
output that this time it has come from the cache:
DEBUG:root:Found orders:count, value is b'12000' INFO: 127.0.0.1:51587 - "GET /count HTTP/1.1" 200 OK
orders:count
value in the DEBUG output is shown as b:12000'
because Valkey strings are byte sequences, and so Python uses Bytes objects to represent them. Bytes literals are shown like string literals, but with a b
before the quotes.Add a POST method to the application
Next, let's add a POST function to the application, so we can add new records to our PostgreSQL database.
This may seem a bit disconnected, but hang tight - you'll understand why we're doing this in the next step. Refer to the complete code if needed.
We need to add another import to main.py
, this time to allow us to describe the arguments to our POST - add it after the other from
import lines:
from pydantic import BaseModel
Then, at the end of the file, we define the API that we're going to use for the new order (this will correspond to the JSON we use):
class Item(BaseModel): """A new order.""" orderdate: str # e.g., 2023-03-10 customerid: int netamount: float tax: float
And after that we can add our POST function which adds a new order to the database:
@app.post("/add/") def post_add(item: Item): """Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() cursor.execute( 'INSERT INTO orders' ' (orderdate, customerid, netamount, tax, totalamount)' ' VALUES (%s, %s, %s, %s, %s);', (item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax), ) pg_conn.commit() logging.debug('Added new order') except Exception as e: raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() return {'updated': {'customer': item.customerid}}
We can test it using curl
:
curl -X 'POST' http://127.0.0.1:8000/add/ \ -H 'Content-Type: application/json' \ -d '{"orderdate":"2023-03-10","customerid":9999,"netamount":23.0,"tax":2.0}'
And we should get the following output back:
{"updated":{"customer":9999}}
The application's debugging should show the following:
DEBUG:root:Adding order {'orderdate': '2023-03-10', 'customerid': 9999, 'netamount': 23.0, 'tax': 2.0} DEBUG:root:Added new order
We can also examine the new database row using psql
. Make sure that the PG_SERVICE_URI
environment variable is set in your terminal (see Save the PostgreSQL connection information earlier), then connect:
psql $PG_SERVICE_URI
Next, change to the correct database:
\c dellstore
Then make a query:
select * from orders where customerid=9999;
which should respond as follows:
orderid | orderdate | customerid | netamount | tax | totalamount ---------+------------+------------+-----------+------+------------- 7918 | 2004-08-28 | 9999 | 51.19 | 4.22 | 55.41 12006 | 2023-03-10 | 9999 | 23.00 | 2.00 | 25.00 (2 rows)
Again, exit from psql
using \q
.
But if we use curl
to ask for the total number of orders again:
curl http://127.0.0.1:8000/count
We still get the old answer:
{"count":"12000"}
Here we have a problem: our cache is out of date. We only update the cache when we call read_count()
for the first time. When we update the database from another method, we aren't updating the cache to reflect the change.
Dealing with out of date caches
Invaldiating caches is a common issue in software development. Our cache is only as useful as it is accurate. So what do we need to do to ensure our cache is accurate?
Well, for starters, we need to ensure that our count
cache is invalidated whenever we call our POST method.
Invalidating the cache
We can solve the problem of our cache getting outdated by "throwing away" the cached value when we add a new order. To do that we need to add the following lines to the start of the post_add
function, just before the call to connect_to_pg()
(the comments aren't strictly necessary, but they make it clearer what we're doing):
# Invalidate the cache entry whether we succeed in doing the update or not # - we don't expect the update to fail, and it shouldn't hurt to clear the # cache a bit too often cache_key = 'orders:count' valkey_conn = connect_to_valkey() # The Valkey command is `del`, but that's special in Python valkey_conn.delete(cache_key)
del
, but that's a Python reserved word, so can't be used as a method name. The Valkey library chooses delete
as a good alternative.The whole function now looks like:
@app.post("/add/") def post_add(item: Item): """Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') # Invalidate the cache entry whether we succeed in doing the update or not # - we don't expect the update to fail, and it shouldn't hurt to clear the # cache a bit too often cache_key = 'orders:count' valkey_conn = connect_to_valkey() # The Valkey command is `del`, but that's special in Python valkey_conn.delete(cache_key) pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() cursor.execute( 'INSERT INTO orders' ' (orderdate, customerid, netamount, tax, totalamount)' ' VALUES (%s, %s, %s, %s, %s);', (item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax), ) pg_conn.commit() logging.debug('Added new order') except Exception as e: raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() return {'updated': {'customer': item.customerid}}
And now when we add a new order:
curl -X 'POST' http://127.0.0.1:8000/add/ \ -H 'Content-Type: application/json' \ -d '{"orderdate":"2023-03-10","customerid":9999,"netamount":23.0,"tax":2.0}'
and ask for the count:
curl http://127.0.0.1:8000/count
We get back an updated value, as we'd expect:
{"count":12002}
Your count
value may be slightly different than the example above – don't worry about it, so long as it's over 12000.
Before leaving this section, let's just tidy up a little bit. We've got two defitions of cache_key
in separate functions (read_count
and post_add
), and while they're both the same, it would be better to define that string just once.
So add another constant definition, before the app = FastAPI()
line:
CACHE_KEY = 'orders:count'
Then, delete the setting of cache_key
by removing this line from both the read_count
and post_add
functions.
cache_key = 'orders:count'
Then change the uses of cache_key
to be uses of CACHE_KEY
instead. Don't forget the uses in the logging calls.
Check it's all working by running the two curl
commands (the POST and the GET) once more.
Specifying a TTL ("time to live")
Next, let's set a Time to Live, or TTL.
This is a common practice when working with caches: it specifies how long we should let a cache stay valid for. As we add more functions and more microservices to our application, other applications might modify a database entry in the background. A TTL ensures that we invalidate the cache on a regular basis, reducing the chances that our cache is inaccurate to the underlying data.
Let's set a new environment variable for the cache timeout in seconds. Kill the uvicorn
process, and then set a new environment variable:
export CACHE_TIMEOUT_SECONDS=5
In main.py
, let's retrieve the new environment variable, and set a default value in case the environment variable isn't set. This new code should go after the code retrieving the PG_SERVICE_URI
and VALKEY_SERVICE_URI
values:
CACHE_TIMEOUT_SECONDS = 120 cache_timeout_str = os.getenv('CACHE_TIMEOUT_SECONDS', str(CACHE_TIMEOUT_SECONDS)) try: CACHE_TIMEOUT_SECONDS = int(cache_timeout_str) except ValueError as e: logging.error(f'Bad value {cache_timeout_str}, using {CACHE_TIMEOUT}') logging.info(f'Cache timeout is {CACHE_TIMEOUT_SECONDS}s')
Next, use the cache timeout by altering the call of the Valkey set
method at the end of our read_count
function.
We just need to add the ex
parameter to specify the timeout:
valkey_conn.set(CACHE_KEY, count, ex=CACHE_TIMEOUT_SECONDS)
Now, if we POST a new order, the next GET for a
count is cached, and we can see that subsequent GET requests
only find a cached value if they are within the CACHE_TIMEOUT
time.
Try it yourself and see
Restart the application:
uvicorn main:app --reload
Make a new POST request, to invalidate the cache.
Then make a GET request followed quickly by another. The second GET will use the cache. But if you now wait longer than the TTL and then make another GET request, the cache entry will have expired, so the database will need to be queried again.
In more detail:
Make a new POST:
curl -X 'POST' http://127.0.0.1:8000/add/ \ -H 'Content-Type: application/json' \ -d '{"orderdate":"2023-03-10","customerid":9999,"netamount":23.0,"tax":2.0}'
Then a GET:
curl http://127.0.0.1:8000/count
Then another GET:
curl http://127.0.0.1:8000/count
At this point uvicorn
should show the following:
DEBUG:root:Adding order {'orderdate': '2023-03-10', 'customerid': 9999, 'netamount': 23.0, 'tax': 2.0} DEBUG:root:Added new order INFO: 127.0.0.1:60078 - "POST /add/ HTTP/1.1" 200 OK DEBUG:root:Caching orders:count, value is 12004 INFO: 127.0.0.1:60090 - "GET /count HTTP/1.1" 200 OK DEBUG:root:Found orders:count, value is b'12004' INFO: 127.0.0.1:60098 - "GET /count HTTP/1.1" 200 OK
But if you now wait for 10 seconds (remember, the TTL is 5 seconds) and do a GET:
curl http://127.0.0.1:8000/count
The application should report the following because the cache entry has expired:
DEBUG:root:Caching orders:count, value is 12004 INFO: 127.0.0.1:60152 - "GET /count HTTP/1.1" 200 OK
Using a Python decorator
There are two slight problems with the code we've got now:
-
Short functions that do fewer things are generally easier to understand. Our GET and POST functions are mixing up the "talk to the database" and the "handle the cache" code, which are really two different things.
-
If we add more GET or POST functions, we'll have to remember to add caching to them all, and it's surprisingly easy to forget to do this.
The DRY ("Don't Repeat Yourself") principle suggests "Every piece of knowledge must have a single, unambiguous, authoritative representation within a system". We can use Python decorators to re-implement our caching, cleaning up the code and helping to solve the two concerns above.
At the start of main.py
, before the import os
line, add:
import functools
(It's normally considered good practice to group the importing of things from the Python standard library together.)
Our first decorator is for GET functions.
Before the read_count
function, add the following:
def check_cache(func): @functools.wraps(func) def wrapper(): valkey_conn = connect_to_valkey() # Is it already in the cache? If so, just use it count = valkey_conn.get(CACHE_KEY) if count: logging.debug(f'Found {CACHE_KEY}, value is {count}') return {'count': count} logging.debug(f'Calling {func} to find the count') retval = func() count = retval['count'] # Remember to add it to the cache logging.debug(f'Caching {CACHE_KEY}, value is {count}') valkey_conn.set(CACHE_KEY, count, ex=CACHE_TIMEOUT_SECONDS) return retval return wrapper
Then, decorate the read_count
function:
@app.get("/count") @check_cache def read_count():
@functools.wraps(func)
line in the decorator itself, so that the wrapper has the same signature as the wrapped function. Secondly, the order of the decorators matters - the FastAPI decorator needs to come first.We now remove the caching code from the inside of the function, leaving us with:
@app.get("/count") @check_cache def read_count(): pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() cursor.execute('SELECT COUNT(*) FROM orders;') count = cursor.fetchone()[0] except Exception as e: raise HTTPException( status_code=500, detail=f"Error querying database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() return {'count': count}
Our second decorator is for POST functions.
Add the following before the post_add
function:
def clear_cache(func): @functools.wraps(func) def wrapper(item: Item): logging.debug(f'Clearing cache') valkey_conn = connect_to_valkey() # Invalidate the cache entry whether we succeed in doing the update or not # - we don't expect the update to fail, and it shouldn't hurt to clear the # cache a bit too often valkey_conn.delete(CACHE_KEY) # The Valkey command is `del`, but that's special in Python return func(item) return wrapper
Next, decorate it:
@app.post("/add/") @clear_cache def post_add(item: Item):
Again, we now remove the caching code from inside the post_add
function:
@app.post("/add/") @clear_cache def post_add(item: Item): """Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() cursor.execute( 'INSERT INTO orders' ' (orderdate, customerid, netamount, tax, totalamount)' ' VALUES (%s, %s, %s, %s, %s);', (item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax), ) pg_conn.commit() logging.debug('Added new order') except Exception as e: raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() return {'updated': {'customer': item.customerid}}
Check that all continues to work by running the curl
commands again, and looking at the application logs to see that the expected logging messages are still being produced.
A more realistic database call
Reviewing our application so far, we can:
- Ask for the total number of orders from the Redis cache
- Post a new order for a given customer to the PostgreSQL database and refresh the cache
- And we've simplified the cache handling by using decorators
Let's add another method to ask for the total number of orders per customer.
As we've discussed before, count
is a slow process on its own, and you wouldn't use it in most production settings. Asking the database for a count
as filtered by customerid
is slower still, but a more realistic operation. Let's cache it!
This means we'll have one cache for the total number of orders, and one for each customer.
To start, we need to add another import statement before the other from
imports at the top of main.py
. We'll need that when we get round to updating the read_count
function in a moment.
from typing import Union
Then we need to replace the definition of CACHE_KEY
at the start of main.py
with the following definition of CACHE_KEY_FORMAT
. This uses a format string to describe the overall structure of our cache key strings.
CACHE_KEY_FORMAT = 'num_orders:{0}'
We need to alter the check_cache
descriptor to understand that it might be caching data for a specific customer id. So we change the signature to take an optional (integer) customerid
argument, and calculate the cache key from that.
The function should end up as follows:
def check_cache(func): @functools.wraps(func) def wrapper(customerid: Union[int, None] = None, *args, **kwargs): valkey_conn = connect_to_valkey() cache_key = CACHE_KEY_FORMAT.format('all' if customerid is None else customerid) logging.debug(f'Checking cache key {cache_key}') # Is it already in the cache? If so, just use it count = valkey_conn.get(cache_key) if count: logging.debug(f'Found {cache_key}, value is {count}') return {'count': count} logging.debug(f'Calling {func} to find the count') retval = func(customerid) count = retval['count'] # Remember to add it to the cache logging.debug(f'Caching {cache_key}, value is {count}') valkey_conn.set(cache_key, count, ex=CACHE_TIMEOUT_SECONDS) return retval return wrapper
The GET function gains the same optional customerid
argument, allowing us to specify the customer id if we wish - we'll see how that's used in an actual query later on:
def read_count(customerid: Union[int, None] = None):
Next, let's change the database query in that function to account for whether or not the user specified a customer id. Find this line:
cursor.execute('SELECT COUNT(*) FROM orders;')
Replace it with the following code:
if customerid is None: logging.debug('Looking up all orders') cursor.execute('SELECT COUNT(*) FROM orders;') else: logging.debug(f'Looking up orders orders for custmerid {customerid}') cursor.execute('SELECT COUNT(*) FROM orders WHERE customerid=%s;', (customerid,))
(It's Python, so make sure to keep the indentation correct!)
The clear_cache
decorator now needs to clear the cache for all orders, and also the cache for the specific customer:
def clear_cache(func): @functools.wraps(func) def wrapper(item: Item): logging.debug('Clearing caches') all_cache_key = CACHE_KEY_FORMAT.format('all') this_cache_key = CACHE_KEY_FORMAT.format(item.customerid) logging.debug(f'Clearing caches for {all_cache_key} and {this_cache_key}') valkey_conn = connect_to_valkey() # Invalidate the cache entry whether we succeed in doing the update or not # - we don't expect the update to fail, in general, and it shouldn't hurt # to clear the cache # The Valkey command is `del`, but that's special in Python valkey_conn.delete(all_cache_key) valkey_conn.delete(this_cache_key) return func(item) return wrapper
We don't need to change the post_add
function at all.
The curl
query for all the orders is the same as before, but now we can ask for just the number of orders for a particular customer.
Now if we POST an order for customer 9999:
curl -X 'POST' http://127.0.0.1:8000/add/ -H 'Content-Type: application/json' -d '{"orderdate":"2023-03-10","customerid":9999,"netamount":23.0,"tax":2.0}'
We should see the following response:
{"updated":{"customer":9999}}
In addition, we can also ask "how many orders are there for customer 9999":
curl 'http://127.0.0.1:8000/count?customerid=9999'
The answer we get back depends on how many orders we've actually created for customer 9999, but should be of the form:
{"count":10}
We can confirm this by sending a request:
curl 'http://127.0.0.1:8000/count/'
We'll see the same result as before. The particular answer depends on how many orders were created for customer 9999.
{"count":12008}
All the code
As a recap, here's the final program, all in one:
import functools import os from typing import Union from fastapi import FastAPI from fastapi import HTTPException from pydantic import BaseModel import logging import psycopg2 import valkey # turn on logging logging.basicConfig(level=logging.DEBUG) # import environment variables PG_SERVICE_URI = os.getenv('PG_SERVICE_URI') VALKEY_SERVICE_URI = os.getenv('VALKEY_SERVICE_URI') CACHE_TIMEOUT_SECONDS = 120 cache_timeout_str = os.getenv('CACHE_TIMEOUT_SECONDS', str(CACHE_TIMEOUT_SECONDS)) try: CACHE_TIMEOUT_SECONDS = int(cache_timeout_str) except ValueError as e: logging.error(f'Bad value {cache_timeout_str}, using {CACHE_TIMEOUT}') logging.info(f'Cache timeout is {CACHE_TIMEOUT_SECONDS}s') CACHE_KEY_FORMAT = 'num_orders:{0}' app = FastAPI() def connect_to_pg(): """Connect to the PostgreSQL backend.""" if not PG_SERVICE_URI: raise HTTPException( status_code=500, detail="Internal server error. Database not specified - " "environment variable PG_SERVICE_URI is empty/unset", ) try: # Use the given service URI, but specify a different database return psycopg2.connect(PG_SERVICE_URI, database='dellstore') except Exception as e: raise HTTPException( status_code=500, detail=f"Error connecting to database: {e.__class__.__name__} {e}", ) def connect_to_valkey(): """Connect to the Valkey backend.""" if not VALKEY_SERVICE_URI: raise HTTPException( status_code=500, detail="Internal server error. Valkey service not specified - " "environment variable VALKEY_SERVICE_URI is empty/unset", ) try: return valkey.from_url(VALKEY_SERVICE_URI) except Exception as e: raise HTTPException( status_code=500, detail=f"Error connecting to Valkey: {e.__class__.__name__} {e}", ) def check_cache(func): @functools.wraps(func) def wrapper(customerid: Union[int, None] = None, *args, **kwargs): valkey_conn = connect_to_valkey() cache_key = CACHE_KEY_FORMAT.format('all' if customerid is None else customerid) logging.debug(f'Checking cache key {cache_key}') # Is it already in the cache? If so, just use it count = valkey_conn.get(cache_key) if count: logging.debug(f'Found {cache_key}, value is {count}') return {'count': count} logging.debug(f'Calling {func} to find the count') retval = func(customerid) count = retval['count'] # Remember to add it to the cache logging.debug(f'Caching {cache_key}, value is {count}') valkey_conn.set(cache_key, count, ex=CACHE_TIMEOUT_SECONDS) return retval return wrapper @app.get("/count") @check_cache def read_count(customerid: Union[int, None] = None): pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() if customerid is None: logging.debug('Looking up all orders') cursor.execute('SELECT COUNT(*) FROM orders;') else: logging.debug(f'Looking up orders orders for custmerid {customerid}') cursor.execute('SELECT COUNT(*) FROM orders WHERE customerid=%s;', (customerid,)) count = cursor.fetchone()[0] except Exception as e: raise HTTPException( status_code=500, detail=f"Error querying database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() return {'count': count} class Item(BaseModel): """A new order.""" orderdate: str # e.g., 2023-03-10 customerid: int netamount: float tax: float def clear_cache(func): @functools.wraps(func) def wrapper(item: Item): logging.debug('Clearing caches') all_cache_key = CACHE_KEY_FORMAT.format('all') this_cache_key = CACHE_KEY_FORMAT.format(item.customerid) logging.debug(f'Clearing caches for {all_cache_key} and {this_cache_key}') valkey_conn = connect_to_valkey() # Invalidate the cache entry whether we succeed in doing the update or not # - we don't expect the update to fail, in general, and it shouldn't hurt # to clear the cache # The Valkey command is `del`, but that's special in Python valkey_conn.delete(all_cache_key) valkey_conn.delete(this_cache_key) return func(item) return wrapper @app.post("/add/") @clear_cache def post_add(item: Item): """Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') pg_conn = connect_to_pg() try: cursor = pg_conn.cursor() cursor.execute( 'INSERT INTO orders' ' (orderdate, customerid, netamount, tax, totalamount)' ' VALUES (%s, %s, %s, %s, %s);', (item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax), ) pg_conn.commit() logging.debug('Added new order') except Exception as e: raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__} {e}", ) finally: pg_conn.close() return {'updated': {'customer': item.customerid}}
That's all, folks!
And there you have it! An application with caching on multiple values using PostgreSQL and Valkey.
Before you move on, don't forget to pause or terminate your Aiven services!
Additional resources
- Check out the application's autogenerated API docs at
http://127.0.0.1:8000/docs
(only whileuvicorn
is actually running the app). - Look at FastAPI Cache as a possible solution for production settings – we've not tried it, but it looks like a useful shortcut!
- Web frameworks like Django and Rails generally make it easy to add a cache, and Redis is often used. See our article on Set up Django to use Aiven for Valkey for how to get Django to talk to Valkey (and there's also a companion article on Set up Django to use Aiven for PostgreSQL®).
- Read up on cURL. It's an incredibly powerful and versatile tool and you can find out a lot more at Everything curl.
- Find out more about Python decorators at Real Python's Primer on Python Decorators. They also provide a nice introduction to Logging in Python.
- We didn't discuss ETags at all, but they're another thing that a production system might consider (and fastapi-cache indicates it has at least some support for them).
- Lastly, the example code we've given uses synchronous Valkey connections, which can block the event loop and degrade performance. A better option might be to use asynchronous connections, with
async
andawait
. See the Asyncio Examples in the valkey-py documentation.
If you want to learn a bit more about PostgreSQL and Valkey themselves, check out:
- Aiven for PostgreSQL® documentation
- Aiven for Valkey™ documentation
- PostgreSQL's official documentation
- Valkey's official documentation
and you can also follow up the Python libraries we used:
- For fastapi check out the tutorial.
- As part of writing this article, we learnt that psycopg now has a version 3. We used
psycopg2
in our examples (documentation at https://www.psycopg.org/docs/) but you might be interested in exploring https://www.psycopg.org/psycopg3/docs/ as well. - The valkey-py documentation is at https://valkey-py.readthedocs.io/ and has copious links back into the main Valkey documentation.