Caching is used to speed up cloud applications, particularly for database reads. Read on to learn more, and find out how to build caching with Valkey™ into a simple PostgreSQL® web app.
How to write a simple Python web application that accesses data in a PostgreSQL® database
How to use curl at the command line to make GET and POST requests to that web application
Why caching the GET response is a good idea, and how to do that with Valkey™, a key-value store using the Redis serialization protocol
Some basics about cache invalidation - how to make sure the cache doesn't get out-of-date
Overview
With any sufficiently complex application, performance becomes a primary concern for optimization. One of the key metrics for measuring performance of any software is the speed of reading and writing from a database.
Most applications repeatedly store (write) and retrieve (read) some kind of data. However in most cases, the number of reads far exceeds the number of writes. To make this more efficient, we used a caching layer. Caching is the act of writing to a location (for instance, a block of memory) specifically designed for quick retrieval of common requests.
For example, imagine a customer creating a profile on an online store. The customer fills out their name, phone number, and address, which we store to our database. Because these pieces of information are required at multiple points in the checkout process, it's worth storing them in a cache the first time we retrieve them from the database. This speeds up application processing time for all following retrievals. The difference in retrieval speed for any single request might be mere milliseconds, but when we develop applications for the cloud, for millions of users simultaneously, those milliseconds add up.
Caching cloud applications introduces some complexities but also offers opportunity for optimization. Instead of using a block of memory (and the unbounded, chaotic nature that entails), we can use two databases! We can use one database as a data store, and one as a cache. Further, we can choose a database for our data store that is optimized for things like concurrency control and one for our cache optiized for speedy reads and writes, while still taking advantage of everything the cloud offers us in terms of scalability.
Prerequisites
We're going to be writing a web application in Python, and you'll need at least Python 3.9.
psql for PostgreSQL. This is useful as it's the standard tool that will work for any PostgreSQL service on any system.
We built the avn CLI to take advantage of all the features Aiven offers for its products, and this works too! We'll provide examples with both in this tutorial.
If you're following along without using Aiven, we still recommend deploying to a cloud provider like AWS or Google Cloud. This tutorial assumes the databases will be configured and deployed for you like Aiven does.
Set up a Python virtual environment
We'll do our development at the command line in a Python virtual environment. This will prevent any of the work we're doing in this tutorial from affecting anything else you might be working on.
First, let's set up the Python virtual environment:
Loading code...
and then install the Python libraries we described above:
Loading code...
The psycopg documentation recommends installing psycopg-binary for development and testing, because installing the psycopg package needs a C compiler and some other dependencies. For production it still recommends building the package yourself.
We're specifying valkey[libvalkey] rather than just valkey, again as recommended by the package documentation, because it provides performance improvements. The single quotes stop the shell from seeing the [ and ] as special characters.
You can quickly check all of those are installed correctly by starting up Python:
Loading code...
and then at the >>> prompt enter the following:
Loading code...
If you don't get any errors from those, then you're good to go. Exit the Python shell by typing:
Loading code...
If you're on Unix/Mac you can use CTRL-D instead.
You can also type pip list at the terminal prompt to get a list of all the Python packages installed in this virtual environment, including packages needed by the ones you asked for.
Install cURL
curl is a command line tool to "do Internet transfers for resources specified as URLs using Internet protocols", and we shall use it in this tutorial to allow us to GET from and POST to our web application, as an alternative to using a web browser.
The Install cURL page in the Everything cURL book describes how to install it on Linux, Windows, macOS and using Docker.
In many operating systems, cURL is already installed. To check if it is, try calling it:
Loading code...
Install the application specific tools
Install psql
The PostgreSQL Tutorial Getting started documentation explains how to install PostgreSQL on your local machine. This should also provide psql, a command line tool for talking to a PostgreSQL service.
If you're using a Mac and Homebrew then you can type the following in a terminal to install libpq and its associated tools, which include psql:
Loading code...
The output of that command will tell you how to make the libpq commands available on your path (it will say something like If you need to have libpq first in your PATH, run: ...). To get it set up temporarily in the current Bash session, we can do:
Loading code...
Installing PostgreSQL with Homebrew doesn't necessarily install psql, and we don't actually need to install the PostgreSQL service for this tutorial.
Install the Aiven command line tool
Aiven also provides a command line tool for working with its services, and it provides a way to run the service-specific command line tool for PostgreSQL or Valkey (respectively, psql or valkey-cli). See Get things done with the Aiven CLI for a bit more background, and the avn service cli documentation for some specifics.
We can install the Aiven CLI using pip, still in our virtual environment:
Loading code...
You'll need to login using a token. Follow the instructions at Create an authentication token to get a token, and then login at the terminal using the following command, where YOUR_EMAIL_ADDRESS is the email address you used to login to the Aiven Console. That will prompt you for the token that you copied.
Loading code...
Then specify the project you want. YOUR_PROJECT_NAME will be the name of the project that you used when getting the token.
Loading code...
When to use avn and when to use psql
You'll notice we specified using two command line tools in this tutorial. Let's talk about when to use which.
avn is Aiven's own CLI tool. Use it if you're following along with Aiven services. avn has the advantage that once you've logged in with a token, you just need the service name to connect, rather than the whole service URI.
psql is useful to know because it's transferrable to any platform, not just Aiven, and lets us query some more specific things about PostgreSQL. If you aren't using Aiven, use psql.
Create an Aiven for PostgreSQL® service
Next, let's create a PostgreSQL instance.
If you're using Aiven, you can also use the command line to do this. In this tutorial, we'll use the Aiven console.
Navigate to the Aiven console. Create an account if you haven't already, or log in if you have. If you're new to Aiven, then you can use the free plan for this tutorial, or create a free trial if you prefer.
Click Create service and create an Aiven for PostgreSQL® service.
Service cloud region: Choose the region closest to you that supports the free plan.
Service plan: Choose Free, which is OK for this tutorial.
Service name: Choose something meaningful - we're using pg-app-backend
or for a paid service:
Service type: PostgreSQL®
Cloud provider: Choose the cloud provider of your choice.
Service cloud region: Choose the region closest to you
Service plan: Choose Hobbyist or Startup (Hobbyist is OK for this tutorial)
Service name: Choose something meaningful - we're using pg-app-backend
When you're ready, click Create service.
This initializes a PostgreSQL® database for use on the cloud and region you choose, with a small service plan (if you were building a real application, you'd want to pick a larger plan).
Save the PostgreSQL connection information
When Aiven is done initializing your PostgreSQL service, it will direct you to the service's Overview page.
You can return to this page any time using the Services menu on the left hand menu and selecting the service you want to view. You can also use the Quick connect button to get convenient copy-and-paste commands and code snippets for a variety of CLI tools and programming connections!
Throughout this tutorial, we'll export common variables, like the PostgreSQL service URI as shell variables. This avoids commiting potentially sensitive data to a version control system like git.
Copy the Service URI from the service page, and at the terminal prompt, set an environment variable to that string. For instance, if you're using the Bash shell:
Loading code...
Remember the single quotes, because the Service URI string probably has a question mark in it.
Put some data into the database
Download the dellstore2-normal-1.0.tar.gz file from the PostgreSQL website.
Unpack it - for instance at the terminal you can use:
Loading code...
Navigate to the dellstore2-normal-1.0 folder on your terminal:
Loading code...
Next, we want to start the PostgreSQL command line tool to talk to PostgreSQL. If you've installed psql then you can do that using the Service URI you saved earlier:
Loading code...
or you can use the avn tool, which needs to know the service name:
Loading code...
In either case, that should leave you talking to the psql prompt, which will say something like:
Loading code...
Create a new database, dellstore:
Loading code...
Then change to the correct database:
Loading code...
When you change to the dellstore database, the prompt should change to dellstore=>.
Import the data by typing:
Loading code...
Then check what objects have been created in the database with:
Loading code...
The output should look like this:
Loading code...
Exist psql by typing \q, and leave the dellstore2-normal-1.0 directory
by typing:
Loading code...
Create a simple web application
The following is a shortened version of the first example from the FastAPI documentation.
Create a file called main.py that contains:
Loading code...
Then run it in a terminal using uvicorn:
Loading code...
Which should say something like:
Loading code...
As that suggests, the --reload switch means that when the source code for main.py changes, it will automatically be reloaded, so the change will take effect without needing to restart the web server. This is nice because it means you don't have to stop and start the server every time you save a change to the source code.
We recommend keeping this terminal window open and leaving uvicorn running through the rest of the tutorial.
Go to http://127.0.0.1:8000/count in your web browser. You should see:
Loading code...
Then, at the command line in a different terminal window, type
Loading code...
The response should look like:
Loading code...
Make it talk to the PostgreSQL database
Next, we'll change the read_count function so that it actually performs a COUNT of records in the database.
This is intentionally a slow operation, as COUNT enumerates through the entire database, record by record. In most production use cases, you wouldn't do this too often.
In real life, a better example of a slow query would be calculating the checkout price of a basket on an ecommerce site. This needs a lot of database access, which takes time, and we don't want to recalculate everything just because the customer hit "refresh".
Change the import statements at the start of the file to bring in the extra libraries we need:
Loading code...
We're going to need the PostgreSQL connection information, so we'll get that from the environment variable we set earlier on:
Loading code...
As we said before, it's a good idea not to hard-code "secrets" into source code, especially if you're going to save it to GitHub or somewhere else public. Since the PG_SERVICE_URI includes all the access information for the database, that definitely applies here. Using an environment variable means that we don't need to alter the code when the connection data changes - the same code can work for a local PostgreSQL and one running in the cloud.
Aiven partners with GitHub in it's secrets scanning program to prevent this for Aiven deployments, but it's a good idea to check manually anyway.
We keep the startup of FastApi itself:
Loading code...
Then, add a function that connects to the Postgres database. Double check that the environment variable was set, and complain if it wasn't!
Loading code...
The SERVICE_URI specifies how to connect to the defaultdb database, and our psycopg.connect call then actually asks for the dellstore database. In a real application, we'd probably specify the database name with another environment variable.
Next we can change the read_count method to ask for how many orders there are in our database (you can see a brief introduction to how to use psycopg to make SQL queries in its documentation, at Basic module usage):
Loading code...
Let's test this first working version of the application.
If the uvicorn command is still running at your terminal, you should see it reload the new version of the application. Otherwise, start the application again:
Loading code...
and run the curl command again, in the other terminal:
Loading code...
We should see an answer for the total number of orders in the database:
Loading code...
Why do we want caching?
When you run main.py as shown above, the code executes the COUNT every time the read_count() method is called – every time a GET query is made. We don't store the result anywhere, and thus we need to perform this expensive, slow operation every time.
Using Redis as a cache solves this: we run one Redis instance that all our backends can access. This lets us store the results of read_count() outside our code and our PostgreSQL database. If we're running a modern cloud-based application, other copies of our backend can access the results of read_count(), and we don't need to run expensive functions as often. Because the data Redis stores is not being read or written to disk, we can achieve very low latencies.
Create an Aiven for Valkey service
Next, let's create an Aiven for Valkey service. As we said, Valkey will serve as our caching layer.
In most cases, you'll want to deploy Valkey to the same region as your PostgreSQL database, as this reduces latency.
Go back to the Aiven console. If you're still on the PostgreSQL service page, select Services from the navigation sidebar to get back to the "Current services" page.
Click Create service and create an Aiven for Valkey service. If you're chose the free plan when creating the PostgresSQL service, then that probably makes sense for Valkey as well.
Service cloud region: Choose the same region as your PostgreSQL database.
Service plan: Choose Free, which is OK for this tutorial.
Service name: Choose something meaningful - we're using valkey-app-cache
or for a paid service:
Service type: Valkey™
Cloud provider: Choose the same cloud provider as for your PostgreSQL database.
Service cloud region: Choose the same region as your PostgreSQL database.
Service plan: Choose Hobbyist or Startup (Hobbyist is OK for this tutorial)
Service name: Choose something meaningful - we're using valkey-app-cache
When you're ready, click Create service. As before, if you were building a real application, you'd want to pick a larger plan.
Save the Valkey connection information
When Aiven is done spinning up your Valkey database, make a note of the Valkey Service URI.
Export this as an environment variable, as we did with PostgreSQL.
First, kill the uvicorn process, if it's still running, and then, if you're
using Bash, type:
Before running uvicorn, you should make sure that PG_SERVICE_URI and VALKEY_SERVICE_URI are both set, as the Python application needs both of them.
Before running psql $PG_SERVICE_URI, you should make sure that PG_SERVICE_URI is set (that sounds obvious, but I've made the mistake myself).
Connect to Valkey and cache the GET method
Now let's add caching for the read_count function into main py.
First, we add an import for the Valkey library. We'll also import the logging package, so we can output messages to the terminal where we're running the application to explain what we're doing with the cache.
Change the start of main.py (everything up to the app = FastAPI()) to look like the following.
Loading code...
This has added:
import statements for the valkey and logging packages.
a mapping for the environment variable we created earlier, VALKEY_SERVICE_URI
a statement to turn logging on
Now, after the connect_to_pg function, add a new connect_to_valkey() function, which will:
Take the VALKEY_SERVICE_URI variable and pass it to the redis.from_url() function to return an instance of Valkey for us.
Catch any errors we make along the way.
The new function looks like this:
Loading code...
Finally, modify the read_count() function to add values to Valkey when called. We do this by:
Connecting to Valkey using our connect_to_valkey() function.
Creating a variable to store our cache key in, orders:count. It's a good idea to name your cache keys something meaningful.
Looking in Valkey for any values stored under the orders:count key.
If a value is found in Valkey, we return the value and exit the function.
If not, we connect to PostgresSQL and run the SELECT COUNT (*) FROM orders; statement, add that value to the Valkey cache, and return the value before exiting.
We should get back the result we expect, just as before:
Loading code...
but now the uvicorn output should say:
Loading code...
If we run the same curl command again
Loading code...
We should get the same output
Loading code...
but we can see from the uvicorn output that this time it has come from the cache:
Loading code...
The orders:count value in the DEBUG output is shown as b:12000' because Valkey strings are byte sequences, and so Python uses Bytes objects to represent them. Bytes literals are shown like string literals, but with a b before the quotes.
Add a POST method to the application
Next, let's add a POST function to the application, so we can add new records to our PostgreSQL database.
This may seem a bit disconnected, but hang tight - you'll understand why we're doing this in the next step. Refer to the complete code if needed.
We need to add another import to main.py, this time to allow us to describe the arguments to our POST - add it after the other from import lines:
Loading code...
Then, at the end of the file, we define the API that we're going to use for the new order (this will correspond to the JSON we use):
Loading code...
And after that we can add our POST function which adds a new order to the database:
Loading code...
We can test it using curl:
Loading code...
And we should get the following output back:
Loading code...
The application's debugging should show the following:
Loading code...
We can also examine the new database row using psql. Make sure that the PG_SERVICE_URI environment variable is set in your terminal (see Save the PostgreSQL connection information earlier), then connect:
Loading code...
Next, change to the correct database:
Loading code...
Then make a query:
Loading code...
which should respond as follows:
Loading code...
Again, exit from psql using \q.
But if we use curl to ask for the total number of orders again:
Loading code...
We still get the old answer:
Loading code...
Here we have a problem: our cache is out of date. We only update the cache when we call read_count() for the first time. When we update the database from another method, we aren't updating the cache to reflect the change.
Dealing with out of date caches
Invaldiating caches is a common issue in software development. Our cache is only as useful as it is accurate. So what do we need to do to ensure our cache is accurate?
Well, for starters, we need to ensure that our count cache is invalidated whenever we call our POST method.
Invalidating the cache
We can solve the problem of our cache getting outdated by "throwing away" the cached value when we add a new order. To do that we need to add the following lines to the start of the post_add function, just before the call to connect_to_pg() (the comments aren't strictly necessary, but they make it clearer what we're doing):
Loading code...
The actual Valkey command is called del, but that's a Python reserved word, so can't be used as a method name. The Valkey library chooses delete as a good alternative.
The whole function now looks like:
Loading code...
And now when we add a new order:
Loading code...
and ask for the count:
Loading code...
We get back an updated value, as we'd expect:
Loading code...
Your count value may be slightly different than the example above – don't worry about it, so long as it's over 12000.
Before leaving this section, let's just tidy up a little bit. We've got two defitions of cache_key in separate functions (read_count and post_add), and while they're both the same, it would be better to define that string just once.
So add another constant definition, before the app = FastAPI() line:
Loading code...
Then, delete the setting of cache_key by removing this line from both the read_count and post_add functions.
Loading code...
Then change the uses of cache_key to be uses of CACHE_KEY instead. Don't forget the uses in the logging calls.
Check it's all working by running the two curl commands (the POST and the GET) once more.
Specifying a TTL ("time to live")
Next, let's set a Time to Live, or TTL.
This is a common practice when working with caches: it specifies how long we should let a cache stay valid for. As we add more functions and more microservices to our application, other applications might modify a database entry in the background. A TTL ensures that we invalidate the cache on a regular basis, reducing the chances that our cache is inaccurate to the underlying data.
Let's set a new environment variable for the cache timeout in seconds. Kill the uvicorn process, and then set a new environment variable:
Loading code...
In main.py, let's retrieve the new environment variable, and set a default value in case the environment variable isn't set. This new code should go after the code retrieving the PG_SERVICE_URI and VALKEY_SERVICE_URI values:
Loading code...
Next, use the cache timeout by altering the call of the Valkey set method at the end of our read_count function.
We just need to add the ex parameter to specify the timeout:
Loading code...
Now, if we POST a new order, the next GET for a
count is cached, and we can see that subsequent GET requests
only find a cached value if they are within the CACHE_TIMEOUT time.
Try it yourself and see
Restart the application:
Loading code...
Make a new POST request, to invalidate the cache.
Then make a GET request followed quickly by another. The second GET will use the cache. But if you now wait longer than the TTL and then make another GET request, the cache entry will have expired, so the database will need to be queried again.
In more detail:
Make a new POST:
Loading code...
Then a GET:
Loading code...
Then another GET:
Loading code...
At this point uvicorn should show the following:
Loading code...
But if you now wait for 10 seconds (remember, the TTL is 5 seconds) and do a GET:
Loading code...
The application should report the following because the cache entry has expired:
Loading code...
Using a Python decorator
There are two slight problems with the code we've got now:
Short functions that do fewer things are generally easier to understand. Our GET and POST functions are mixing up the "talk to the database" and the "handle the cache" code, which are really two different things.
If we add more GET or POST functions, we'll have to remember to add caching to them all, and it's surprisingly easy to forget to do this.
The DRY ("Don't Repeat Yourself") principle suggests "Every piece of knowledge must have a single, unambiguous, authoritative representation within a system". We can use Python decorators to re-implement our caching, cleaning up the code and helping to solve the two concerns above.
At the start of main.py, before the import os line, add:
Loading code...
(It's normally considered good practice to group the importing of things from the Python standard library together.)
Our first decorator is for GET functions.
Before the read_count function, add the following:
Loading code...
Then, decorate the read_count function:
Loading code...
Since FastAPI has other logic in its decorators, we need a little bit of care when adding our own. First of all, it's important to have that @functools.wraps(func) line in the decorator itself, so that the wrapper has the same signature as the wrapped function. Secondly, the order of the decorators matters - the FastAPI decorator needs to come first.
We now remove the caching code from the inside of the function, leaving us with:
Loading code...
Our second decorator is for POST functions.
Add the following before the post_add function:
Loading code...
Next, decorate it:
Loading code...
The same care is needed as for the previous decorator.
Again, we now remove the caching code from inside the post_add function:
Loading code...
Check that all continues to work by running the curl commands again, and looking at the application logs to see that the expected logging messages are still being produced.
A more realistic database call
Reviewing our application so far, we can:
Ask for the total number of orders from the Valkey cache
Post a new order for a given customer to the PostgreSQL database and refresh the cache
And we've simplified the cache handling by using decorators
Let's add another method to ask for the total number of orders per customer.
As we've discussed before, count is a slow process on its own, and you wouldn't use it in most production settings. Asking the database for a count as filtered by customerid is slower still, but a more realistic operation. Let's cache it!
This means we'll have one cache for the total number of orders, and one for each customer.
To start, we need to add another import statement before the other from imports at the top of main.py. We'll need that when we get round to updating the read_count function in a moment.
Loading code...
Then we need to replace the definition of CACHE_KEY at the start of main.py with the following definition of CACHE_KEY_FORMAT. This uses a format string to describe the overall structure of our cache key strings.
Loading code...
We need to alter the check_cache descriptor to understand that it might be caching data for a specific customer id. So we change the signature to take an optional (integer) customerid argument, and calculate the cache key from that.
The function should end up as follows:
Loading code...
The GET function gains the same optional customerid argument, allowing us to specify the customer id if we wish - we'll see how that's used in an actual query later on:
Loading code...
Next, let's change the database query in that function to account for whether or not the user specified a customer id. Find this line:
Loading code...
Replace it with the following code:
Loading code...
(It's Python, so make sure to keep the indentation correct!)
The clear_cache decorator now needs to clear the cache for all orders, and also the cache for the specific customer:
Loading code...
We don't need to change the post_add function at all.
The curl query for all the orders is the same as before, but now we can ask for just the number of orders for a particular customer.
Now if we POST an order for customer 9999:
Loading code...
We should see the following response:
Loading code...
In addition, we can also ask "how many orders are there for customer 9999":
Loading code...
The answer we get back depends on how many orders we've actually created for customer 9999, but should be of the form:
Loading code...
We can confirm this by sending a request:
Loading code...
We'll see the same result as before. The particular answer depends on how many orders were created for customer 9999.
Loading code...
All the code
As a recap, here's the final program, all in one:
Loading code...
That's all, folks!
And there you have it! An application with caching on multiple values using PostgreSQL and Valkey.
Before you move on, don't forget to pause or terminate your Aiven services!
Additional resources
Check out the application's autogenerated API docs at http://127.0.0.1:8000/docs (only while uvicorn is actually running the app).
Look at FastAPI Cache as a possible solution for production settings – we've not tried it, but it looks like a useful shortcut!
We didn't discuss ETags at all, but they're another thing that a production system might consider (and fastapi-cache indicates it has at least some support for them).
Lastly, the example code we've given uses synchronous Valkey connections, which can block the event loop and degrade performance. A better option might be to use asynchronous connections, with async and await. See the Asyncio Examples in the valkey-py documentation.
If you want to learn a bit more about PostgreSQL and Valkey themselves, check out:
psql (14.7 (Homebrew), server 14.6)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
defaultdb=>
CREATE DATABASE dellstore;
\c dellstore
\i dellstore2-normal-1.0.sql
\d
List of relations
Schema | Name | Type | Owner
--------+--------------------------+----------+----------
public | categories | table | avnadmin
public | categories_category_seq | sequence | avnadmin
public | cust_hist | table | avnadmin
public | customers | table | avnadmin
public | customers_customerid_seq | sequence | avnadmin
public | inventory | table | avnadmin
public | orderlines | table | avnadmin
public | orders | table | avnadmin
public | orders_orderid_seq | sequence | avnadmin
public | products | table | avnadmin
public | products_prod_id_seq | sequence | avnadmin
public | reorder | table | avnadmin
(12 rows)
cd ..
from fastapi import FastAPI
app = FastAPI()@app.get("/count")defread_count():return{"count":0}
uvicorn main:app --reload
INFO: Will watch for changes in these directories: ['/Users/tony.ibbs/sw/aiven/pg-redis-tutorial']
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Started reloader process [75284] using StatReload
INFO: Started server process [75286]
INFO: Waiting for application startup.
INFO: Application startup complete.
{"count":0}
curl http://127.0.0.1:8000/count
{"count":0}
import os
from fastapi import FastAPI
from fastapi import HTTPException
import psycopg
PG_SERVICE_URI = os.getenv('PG_SERVICE_URI')
app = FastAPI()
defconnect_to_pg():"""Connect to the PostgreSQL backend."""ifnot PG_SERVICE_URI:raise HTTPException( status_code=500, detail="Internal server error. Database not specified - ""environment variable PG_SERVICE_URI is empty/unset",)try:# Use the given service URI, but specify a different databasereturn psycopg.connect(PG_SERVICE_URI, database='dellstore')except Exception as e:raise HTTPException( status_code=500, detail=f"Error connecting to database: {e.__class__.__name__}{e}",)
exportVALKEY_SERVICE_URI='<the Valkey Service URI>'
import os
from fastapi import FastAPI
from fastapi import HTTPException
import logging
import psycopg
import valkey
# turn on logging logging.basicConfig(level=logging.DEBUG)# import environment variablesPG_SERVICE_URI = os.getenv('PG_SERVICE_URI')VALKEY_SERVICE_URI = os.getenv('VALKEY_SERVICE_URI')app = FastAPI()
defconnect_to_valkey():"""Connect to the Valkey backend."""ifnot VALKEY_SERVICE_URI:raise HTTPException( status_code=500, detail="Internal server error. Valkey service not specified - ""environment variable VALKEY_SERVICE_URI is empty/unset",)try:return valkey.from_url(VALKEY_SERVICE_URI)except Exception as e:raise HTTPException( status_code=500, detail=f"Error connecting to Valkey: {e.__class__.__name__}{e}",)
@app.get("/count")defread_count(): cache_key ='orders:count' valkey_conn = connect_to_valkey()# Is it already in the cache? If so, just use it count = valkey_conn.get(cache_key)if count: logging.debug(f'Found {cache_key}, value is {count}')return{'count': count} pg_conn = connect_to_pg()try: cursor = pg_conn.cursor() cursor.execute('SELECT COUNT(*) FROM orders;') count = cursor.fetchone()[0]except Exception as e:raise HTTPException( status_code=500, detail=f"Error querying database: {e.__class__.__name__}{e}",)finally: pg_conn.close()# Remember to add it to the cache logging.debug(f'Caching {cache_key}, value is {count}') valkey_conn.set(cache_key, count)return{'count': count}
uvicorn main:app --reload
curl http://127.0.0.1:8000/count
{"count":12000}
DEBUG:root:Caching orders:count, value is 12000
INFO: 127.0.0.1:51584 - "GET /count HTTP/1.1" 200 OK
curl http://127.0.0.1:8000/count
{"count":"12000"}
DEBUG:root:Found orders:count, value is b'12000'
INFO: 127.0.0.1:51587 - "GET /count HTTP/1.1" 200 OK
from pydantic import BaseModel
classItem(BaseModel):"""A new order.""" orderdate:str# e.g., 2023-03-10 customerid:int netamount:float tax:float
@app.post("/add/")defpost_add(item: Item):"""Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') pg_conn = connect_to_pg()try: cursor = pg_conn.cursor() cursor.execute('INSERT INTO orders'' (orderdate, customerid, netamount, tax, totalamount)'' VALUES (%s, %s, %s, %s, %s);',(item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax),) pg_conn.commit() logging.debug('Added new order')except Exception as e:raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__}{e}",)finally: pg_conn.close()return{'updated':{'customer': item.customerid}}
# Invalidate the cache entry whether we succeed in doing the update or not# - we don't expect the update to fail, and it shouldn't hurt to clear the# cache a bit too often cache_key ='orders:count' valkey_conn = connect_to_valkey()# The Valkey command is `del`, but that's special in Python valkey_conn.delete(cache_key)
@app.post("/add/")defpost_add(item: Item):"""Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}')# Invalidate the cache entry whether we succeed in doing the update or not# - we don't expect the update to fail, and it shouldn't hurt to clear the# cache a bit too often cache_key ='orders:count' valkey_conn = connect_to_valkey()# The Valkey command is `del`, but that's special in Python valkey_conn.delete(cache_key) pg_conn = connect_to_pg()try: cursor = pg_conn.cursor() cursor.execute('INSERT INTO orders'' (orderdate, customerid, netamount, tax, totalamount)'' VALUES (%s, %s, %s, %s, %s);',(item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax),) pg_conn.commit() logging.debug('Added new order')except Exception as e:raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__}{e}",)finally: pg_conn.close()return{'updated':{'customer': item.customerid}}
CACHE_TIMEOUT_SECONDS =120cache_timeout_str = os.getenv('CACHE_TIMEOUT_SECONDS',str(CACHE_TIMEOUT_SECONDS))try: CACHE_TIMEOUT_SECONDS =int(cache_timeout_str)except ValueError as e: logging.error(f'Bad value {cache_timeout_str}, using {CACHE_TIMEOUT}')logging.info(f'Cache timeout is {CACHE_TIMEOUT_SECONDS}s')
DEBUG:root:Adding order {'orderdate': '2023-03-10', 'customerid': 9999, 'netamount': 23.0, 'tax': 2.0}
DEBUG:root:Added new order
INFO: 127.0.0.1:60078 - "POST /add/ HTTP/1.1" 200 OK
DEBUG:root:Caching orders:count, value is 12004
INFO: 127.0.0.1:60090 - "GET /count HTTP/1.1" 200 OK
DEBUG:root:Found orders:count, value is b'12004'
INFO: 127.0.0.1:60098 - "GET /count HTTP/1.1" 200 OK
curl http://127.0.0.1:8000/count
DEBUG:root:Caching orders:count, value is 12004
INFO: 127.0.0.1:60152 - "GET /count HTTP/1.1" 200 OK
import functools
defcheck_cache(func):@functools.wraps(func)defwrapper(): valkey_conn = connect_to_valkey()# Is it already in the cache? If so, just use it count = valkey_conn.get(CACHE_KEY)if count: logging.debug(f'Found {CACHE_KEY}, value is {count}')return{'count': count} logging.debug(f'Calling {func} to find the count') retval = func() count = retval['count']# Remember to add it to the cache logging.debug(f'Caching {CACHE_KEY}, value is {count}') valkey_conn.set(CACHE_KEY, count, ex=CACHE_TIMEOUT_SECONDS)return retval
return wrapper
defclear_cache(func):@functools.wraps(func)defwrapper(item: Item): logging.debug(f'Clearing cache') valkey_conn = connect_to_valkey()# Invalidate the cache entry whether we succeed in doing the update or not# - we don't expect the update to fail, and it shouldn't hurt to clear the# cache a bit too often valkey_conn.delete(CACHE_KEY)# The Valkey command is `del`, but that's special in Pythonreturn func(item)return wrapper
@app.post("/add/")@clear_cachedefpost_add(item: Item):"""Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') pg_conn = connect_to_pg()try: cursor = pg_conn.cursor() cursor.execute('INSERT INTO orders'' (orderdate, customerid, netamount, tax, totalamount)'' VALUES (%s, %s, %s, %s, %s);',(item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax),) pg_conn.commit() logging.debug('Added new order')except Exception as e:raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__}{e}",)finally: pg_conn.close()return{'updated':{'customer': item.customerid}}
from typing import Union
CACHE_KEY_FORMAT ='num_orders:{0}'
defcheck_cache(func):@functools.wraps(func)defwrapper(customerid: Union[int,None]=None,*args,**kwargs): valkey_conn = connect_to_valkey() cache_key = CACHE_KEY_FORMAT.format('all'if customerid isNoneelse customerid) logging.debug(f'Checking cache key {cache_key}')# Is it already in the cache? If so, just use it count = valkey_conn.get(cache_key)if count: logging.debug(f'Found {cache_key}, value is {count}')return{'count': count} logging.debug(f'Calling {func} to find the count') retval = func(customerid) count = retval['count']# Remember to add it to the cache logging.debug(f'Caching {cache_key}, value is {count}') valkey_conn.set(cache_key, count, ex=CACHE_TIMEOUT_SECONDS)return retval
return wrapper
defread_count(customerid: Union[int,None]=None):
cursor.execute('SELECT COUNT(*) FROM orders;')
if customerid isNone: logging.debug('Looking up all orders') cursor.execute('SELECT COUNT(*) FROM orders;')else: logging.debug(f'Looking up orders orders for custmerid {customerid}') cursor.execute('SELECT COUNT(*) FROM orders WHERE customerid=%s;',(customerid,))
defclear_cache(func):@functools.wraps(func)defwrapper(item: Item): logging.debug('Clearing caches') all_cache_key = CACHE_KEY_FORMAT.format('all') this_cache_key = CACHE_KEY_FORMAT.format(item.customerid) logging.debug(f'Clearing caches for {all_cache_key} and {this_cache_key}') valkey_conn = connect_to_valkey()# Invalidate the cache entry whether we succeed in doing the update or not# - we don't expect the update to fail, in general, and it shouldn't hurt# to clear the cache# The Valkey command is `del`, but that's special in Python valkey_conn.delete(all_cache_key) valkey_conn.delete(this_cache_key)return func(item)return wrapper
import functools
import os
from typing import Union
from fastapi import FastAPI
from fastapi import HTTPException
from pydantic import BaseModel
import logging
import psycopg
import valkey
# turn on logginglogging.basicConfig(level=logging.DEBUG)# import environment variablesPG_SERVICE_URI = os.getenv('PG_SERVICE_URI')VALKEY_SERVICE_URI = os.getenv('VALKEY_SERVICE_URI')CACHE_TIMEOUT_SECONDS =120cache_timeout_str = os.getenv('CACHE_TIMEOUT_SECONDS',str(CACHE_TIMEOUT_SECONDS))try: CACHE_TIMEOUT_SECONDS =int(cache_timeout_str)except ValueError as e: logging.error(f'Bad value {cache_timeout_str}, using {CACHE_TIMEOUT}')logging.info(f'Cache timeout is {CACHE_TIMEOUT_SECONDS}s')CACHE_KEY_FORMAT ='num_orders:{0}'app = FastAPI()defconnect_to_pg():"""Connect to the PostgreSQL backend."""ifnot PG_SERVICE_URI:raise HTTPException( status_code=500, detail="Internal server error. Database not specified - ""environment variable PG_SERVICE_URI is empty/unset",)try:# Use the given service URI, but specify a different databasereturn psycopg.connect(PG_SERVICE_URI, database='dellstore')except Exception as e:raise HTTPException( status_code=500, detail=f"Error connecting to database: {e.__class__.__name__}{e}",)defconnect_to_valkey():"""Connect to the Valkey backend."""ifnot VALKEY_SERVICE_URI:raise HTTPException( status_code=500, detail="Internal server error. Valkey service not specified - ""environment variable VALKEY_SERVICE_URI is empty/unset",)try:return valkey.from_url(VALKEY_SERVICE_URI)except Exception as e:raise HTTPException( status_code=500, detail=f"Error connecting to Valkey: {e.__class__.__name__}{e}",)defcheck_cache(func):@functools.wraps(func)defwrapper(customerid: Union[int,None]=None,*args,**kwargs): valkey_conn = connect_to_valkey() cache_key = CACHE_KEY_FORMAT.format('all'if customerid isNoneelse customerid) logging.debug(f'Checking cache key {cache_key}')# Is it already in the cache? If so, just use it count = valkey_conn.get(cache_key)if count: logging.debug(f'Found {cache_key}, value is {count}')return{'count': count} logging.debug(f'Calling {func} to find the count') retval = func(customerid) count = retval['count']# Remember to add it to the cache logging.debug(f'Caching {cache_key}, value is {count}') valkey_conn.set(cache_key, count, ex=CACHE_TIMEOUT_SECONDS)return retval
return wrapper
@app.get("/count")@check_cachedefread_count(customerid: Union[int,None]=None): pg_conn = connect_to_pg()try: cursor = pg_conn.cursor()if customerid isNone: logging.debug('Looking up all orders') cursor.execute('SELECT COUNT(*) FROM orders;')else: logging.debug(f'Looking up orders orders for custmerid {customerid}') cursor.execute('SELECT COUNT(*) FROM orders WHERE customerid=%s;',(customerid,)) count = cursor.fetchone()[0]except Exception as e:raise HTTPException( status_code=500, detail=f"Error querying database: {e.__class__.__name__}{e}",)finally: pg_conn.close()return{'count': count}classItem(BaseModel):"""A new order.""" orderdate:str# e.g., 2023-03-10 customerid:int netamount:float tax:floatdefclear_cache(func):@functools.wraps(func)defwrapper(item: Item): logging.debug('Clearing caches') all_cache_key = CACHE_KEY_FORMAT.format('all') this_cache_key = CACHE_KEY_FORMAT.format(item.customerid) logging.debug(f'Clearing caches for {all_cache_key} and {this_cache_key}') valkey_conn = connect_to_valkey()# Invalidate the cache entry whether we succeed in doing the update or not# - we don't expect the update to fail, in general, and it shouldn't hurt# to clear the cache# The Valkey command is `del`, but that's special in Python valkey_conn.delete(all_cache_key) valkey_conn.delete(this_cache_key)return func(item)return wrapper
@app.post("/add/")@clear_cachedefpost_add(item: Item):"""Add a new order.""" order = item.dict() logging.debug(f'Adding order {order}') pg_conn = connect_to_pg()try: cursor = pg_conn.cursor() cursor.execute('INSERT INTO orders'' (orderdate, customerid, netamount, tax, totalamount)'' VALUES (%s, %s, %s, %s, %s);',(item.orderdate, item.customerid, item.netamount, item.tax, item.netamount + item.tax),) pg_conn.commit() logging.debug('Added new order')except Exception as e:raise HTTPException( status_code=500, detail=f"Error updating database: {e.__class__.__name__}{e}",)finally: pg_conn.close()return{'updated':{'customer': item.customerid}}