DEV Community

Alain Airom
Alain Airom

Posted on • Edited on

User authentication caching in FastAPI

Mocking a user authentication with cached bearer token to reduce latencies and enhance end-user experience and reduce response time from application(s).

Image description
Image courtesy to Reddit

TL;DR

Giving a very simple authentication application to mocking caching the authentication bearer to overcome latencies, related to a recent case I’m working on. For sure this is a basic over-simplfied solution to show the methodology.

Implementation

Image description

The users database is the following ⬇️😄

{ "john": { "password": "password123", "role": "admin" }, "jane": { "password": "password456", "role": "user" }, "alice": { "password": "securePass", "role": "user" } } 
Enter fullscreen mode Exit fullscreen mode

The sample FastAPI server implementation;

# server.py (FastAPI with authentication and caching) from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from fastapi.responses import JSONResponse from pydantic import BaseModel from typing import Dict, Optional import time import secrets import hashlib import json app = FastAPI() # In-memory token cache (replace with Redis or Memcached in production) token_cache: Dict[str, dict] = {} oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") class Token(BaseModel): access_token: str token_type: str class User(BaseModel): username: str password: str class Item(BaseModel): name: str description: Optional[str] = None def load_credentials(filepath="credentials.json"): try: with open(filepath, "r") as f: return json.load(f) except FileNotFoundError: print(f"Error: Credentials file '{filepath}' not found.") return {} except json.JSONDecodeError: print(f"Error: Invalid JSON in '{filepath}'.") return {} def get_user(username: str): credentials = load_credentials() if username in credentials: return credentials[username] return None def generate_token(username: str): token = secrets.token_hex(32) return token def verify_password(password: str, hashed_password: str): return hashlib.sha256(password.encode()).hexdigest() == hashed_password def authenticate_user(user: User): user_data = get_user(user.username) if not user_data: return None hashed_password = hashlib.sha256(user.password.encode()).hexdigest() if verify_password(user.password, hashlib.sha256(user_data["password"].encode()).hexdigest()): return user_data return None def get_current_user(token: str = Depends(oauth2_scheme)): if token in token_cache: return token_cache[token] credentials = load_credentials() for username, user_data in credentials.items(): if generate_token(username) == token: token_cache[token] = user_data return user_data raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, ) @app.post("/token", response_model=Token) async def login_for_access_token(user: User): authenticated_user = authenticate_user(user) if not authenticated_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = generate_token(user.username) token_cache[access_token] = authenticated_user return Token(access_token=access_token, token_type="bearer") @app.get("/items/") async def read_items(current_user: dict = Depends(get_current_user)): if current_user["role"] == "admin": return {"items": [{"name": "Foo", "description": "Admin Items"}]} else: return {"items": [{"name": "Foo", "description": "User Items"}]} @app.post("/items/") async def create_item(item: Item, current_user: dict = Depends(get_current_user)): if current_user["role"] != "admin": raise HTTPException(status_code=403, detail="Insufficient privileges") return item @app.get("/user/") async def get_user_info(current_user: dict = Depends(get_current_user)): return current_user 
Enter fullscreen mode Exit fullscreen mode

And finally the client application!

import requests import json import getpass BASE_URL = "http://127.0.0.1:8000" def load_credentials(filepath="credentials.json"): try: with open(filepath, "r") as f: return json.load(f) except FileNotFoundError: print(f"Error: Credentials file '{filepath}' not found.") return {} except json.JSONDecodeError: print(f"Error: Invalid JSON in '{filepath}'.") return {} def get_token(username): credentials = load_credentials() if username not in credentials: print(f"Error: Username '{username}' not found in credentials.") return None password = getpass.getpass(f"Enter password for {username}: ") if credentials[username]["password"] != password: print("Error: Incorrect password.") return None data = {"username": username, "password": password} response = requests.post(f"{BASE_URL}/token", json=data) response.raise_for_status() return response.json()["access_token"] def get_items(token): headers = {"Authorization": f"Bearer {token}"} try: response = requests.get(f"{BASE_URL}/items/", headers=headers) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 403: try: error_message = e.response.json().get('detail') print(error_message) except json.JSONDecodeError: print("Error: Server returned a 403 but no valid JSON in the response.") else: raise e def create_item(token, item_name, item_description): headers = {"Authorization": f"Bearer {token}"} data = {"name": item_name, "description": item_description} # moved data out of the try block  try: response = requests.post(f"{BASE_URL}/items/", headers=headers, json=data) response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: if e.response.status_code == 403: try: error_message = e.response.json().get('detail') print(error_message) except json.JSONDecodeError: print("Error: Server returned a 403 but no valid JSON in the response.") else: raise e return None # Added return None to handle errors.  def get_user_data(token): headers = {"Authorization": f"Bearer {token}"} response = requests.get(f"{BASE_URL}/user/", headers=headers) response.raise_for_status() return response.json() if __name__ == "__main__": credentials = load_credentials() while True: username = input("Enter username (or 'quit'/'exit' to end): ").lower() if username in ("quit", "exit"): break try: token = get_token(username) if token: print(f"Token: {token}") items = get_items(token) if items: #only print items if there are any, and if there are no errors.  print(f"Items: {items}") new_item = create_item(token, "New Item", "Description of new item") if new_item: #only print the new item if there are no errors  print(f"Created item: {new_item}") user_data = get_user_data(token) print(f"User Data: {user_data}") except requests.exceptions.RequestException as e: print(f"Error: {e}") 
Enter fullscreen mode Exit fullscreen mode

And the output of the application…

Enter username (or 'quit'/'exit' to end): john Enter password for john: Token: b049545df8dcb70db93b1e331d8449341c42abc5a7736eae9abb6d0e9a2fadde Items: {'items': [{'name': 'Foo', 'description': 'Admin Items'}]} Created item: {'name': 'New Item', 'description': 'Description of new item'} User Data: {'password': 'password123', 'role': 'admin'} Enter username (or 'quit'/'exit' to end): jane Enter password for jane: Token: 2f9fca445e6e3a50a8d3bfcadcf6cdaf04ae8b469f38aee63df25508856c6f00 Items: {'items': [{'name': 'Foo', 'description': 'User Items'}]} Insufficient privileges User Data: {'password': 'password456', 'role': 'user'} Enter username (or 'quit'/'exit' to end): hanz Error: Username 'hanz' not found in credentials. Enter username (or 'quit'/'exit' to end): end Error: Username 'end' not found in credentials. 
Enter fullscreen mode Exit fullscreen mode

Conclusion

In this application, caching the bearer token offers a significant performance boost and reduces server load. By storing the validated bearer tokens along with their associated user data in an in-memory cache (simulated here, but ideally a dedicated caching system like Redis or others…), subsequent requests from the same user can bypass the expensive authentication process. Instead of re-verifying credentials, re-calculating hashes, or querying a database, the server simply retrieves the user’s information from the cache, which is much faster. This reduces latency, improves response times, and allows the server to handle more concurrent requests, leading to better scalability. Furthermore, by minimizing the number of database interactions or authentication calculations, the server consumes fewer resources, ultimately enhancing the application’s overall efficiency.

Thanks for reading 🕴️!

Top comments (1)

Collapse
 
rouilj profile image
John P. Rouillard

What is your plan to invalidate the cache if the user changes their password? Your cached bearer token is the same as a session cookie. How do you revoke the session cookie?

This seems to have the same issue as JWT's. At least with JWT's there is a (hopefully short) time after which the JWT credential is useless.