DEV Community

Geoffrey Kim
Geoffrey Kim

Posted on • Edited on

Building a Modern User Permission Management System with FastAPI, SQLAlchemy 2.0, and MariaDB

In this article, I'll guide you through designing and implementing a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. This comprehensive guide covers database schema design, contemporary permission models, and practical implementation using current best practices.

Recent Trends in Permission Models

While traditional Role-Based Access Control (RBAC) is still widely used, modern applications often require more sophisticated permission management approaches. Here are some key trends:

1. Attribute-Based Access Control (ABAC)

ABAC determines access rights based on user attributes, resource attributes, and environmental conditions. This approach enables highly flexible access control policies that can consider factors such as user department, resource sensitivity, and time-based restrictions.

2. Policy-Based Access Control (PBAC)

PBAC manages access through centralized policies that define specific conditions. These policies can include complex rules like "allow access to financial data only for finance department employees during business hours."

3. JSON-Based Permission Management

Storing permissions in JSON format allows for flexible, hierarchical structures that can represent complex permission sets and condition-based rules.

4. User Groups and Hierarchies

Incorporating user groups and hierarchical structures streamlines permission management by allowing administrators to assign permissions to entire groups rather than individual users.

5. Integration with External Authentication

Using standards like OAuth 2.0 and OpenID Connect enables Single Sign-On (SSO) and consistent authentication across multiple systems.

Database Design

Let's design a database schema that supports these modern approaches.

Core Tables

CREATE TABLE users ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, password_hash VARCHAR(255) NOT NULL, email VARCHAR(100) NOT NULL UNIQUE, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE roles ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(50) NOT NULL UNIQUE, description TEXT ); CREATE TABLE permissions ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL UNIQUE, resource VARCHAR(100) NOT NULL, action VARCHAR(50) NOT NULL, conditions JSON, description TEXT, UNIQUE KEY resource_action (resource, action) ); CREATE TABLE groups ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL UNIQUE, description TEXT ); 
Enter fullscreen mode Exit fullscreen mode

Relationship Tables

CREATE TABLE user_roles ( user_id INT, role_id INT, PRIMARY KEY (user_id, role_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE ); CREATE TABLE role_permissions ( role_id INT, permission_id INT, PRIMARY KEY (role_id, permission_id), FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE, FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE ); CREATE TABLE user_permissions ( user_id INT, permission_id INT, PRIMARY KEY (user_id, permission_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE ); CREATE TABLE user_groups ( user_id INT, group_id INT, PRIMARY KEY (user_id, group_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE ); CREATE TABLE group_roles ( group_id INT, role_id INT, PRIMARY KEY (group_id, role_id), FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE, FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE ); CREATE TABLE audit_logs ( id INT PRIMARY KEY AUTO_INCREMENT, user_id INT, action VARCHAR(50) NOT NULL, resource_type VARCHAR(50) NOT NULL, resource_id INT, details JSON, ip_address VARCHAR(45), timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL ); 
Enter fullscreen mode Exit fullscreen mode

Setting Up FastAPI and SQLAlchemy 2.0

Database Models with SQLAlchemy 2.0

Let's define our models using SQLAlchemy 2.0's latest patterns:

from datetime import datetime from typing import Any, Dict, List, Optional from sqlalchemy import ForeignKey, String, Boolean, JSON, Integer, Text, TIMESTAMP, func from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship # Base class for SQLAlchemy models using 2.0 style class Base(DeclarativeBase): pass # Association tables for many-to-many relationships from sqlalchemy import Table, Column user_roles = Table( 'user_roles', Base.metadata, Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')), Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')), ) role_permissions = Table( 'role_permissions', Base.metadata, Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')), Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')), ) user_permissions = Table( 'user_permissions', Base.metadata, Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')), Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')), ) user_groups = Table( 'user_groups', Base.metadata, Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')), Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')), ) group_roles = Table( 'group_roles', Base.metadata, Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')), Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')), ) class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(primary_key=True) username: Mapped[str] = mapped_column(String(50), unique=True, index=True) password_hash: Mapped[str] = mapped_column(String(255)) email: Mapped[str] = mapped_column(String(100), unique=True, index=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True) created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now()) roles: Mapped[List["Role"]] = relationship(secondary=user_roles, back_populates="users") direct_permissions: Mapped[List["Permission"]] = relationship(secondary=user_permissions, back_populates="users") groups: Mapped[List["Group"]] = relationship(secondary=user_groups, back_populates="users") audit_logs: Mapped[List["AuditLog"]] = relationship(back_populates="user") class Role(Base): __tablename__ = 'roles' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(50), unique=True, index=True) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) users: Mapped[List["User"]] = relationship(secondary=user_roles, back_populates="roles") permissions: Mapped[List["Permission"]] = relationship(secondary=role_permissions, back_populates="roles") groups: Mapped[List["Group"]] = relationship(secondary=group_roles, back_populates="roles") class Permission(Base): __tablename__ = 'permissions' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True, index=True) resource: Mapped[str] = mapped_column(String(100)) action: Mapped[str] = mapped_column(String(50)) conditions: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) roles: Mapped[List["Role"]] = relationship(secondary=role_permissions, back_populates="permissions") users: Mapped[List["User"]] = relationship(secondary=user_permissions, back_populates="direct_permissions") __table_args__ = ( {'sqlite_autoincrement': True}, # For SQLite, if used  ) class Group(Base): __tablename__ = 'groups' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True, index=True) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) users: Mapped[List["User"]] = relationship(secondary=user_groups, back_populates="groups") roles: Mapped[List["Role"]] = relationship(secondary=group_roles, back_populates="groups") class AuditLog(Base): __tablename__ = 'audit_logs' id: Mapped[int] = mapped_column(primary_key=True) user_id: Mapped[Optional[int]] = mapped_column(ForeignKey('users.id', ondelete='SET NULL'), nullable=True) action: Mapped[str] = mapped_column(String(50)) resource_type: Mapped[str] = mapped_column(String(50)) resource_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) details: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True) ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True) timestamp: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now()) user: Mapped[Optional["User"]] = relationship(back_populates="audit_logs") 
Enter fullscreen mode Exit fullscreen mode

Database Setup with Async Support

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker # For async database operations DATABASE_URL = "mariadb+aiomysql://user:password@localhost/dbname" async_engine = create_async_engine( DATABASE_URL, echo=True, future=True, ) AsyncSessionLocal = sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False ) # For sync operations (if needed) from sqlalchemy import create_engine from sqlalchemy.orm import Session SYNC_DATABASE_URL = "mariadb+pymysql://user:password@localhost/dbname" sync_engine = create_engine( SYNC_DATABASE_URL, echo=True, future=True, ) SyncSessionLocal = sessionmaker( sync_engine, class_=Session, expire_on_commit=False ) # Creating tables async def create_tables(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) 
Enter fullscreen mode Exit fullscreen mode

Pydantic Models with Pydantic v2

from pydantic import BaseModel, EmailStr, field_validator, Field, ConfigDict from datetime import datetime from typing import Dict, Any, List, Optional # User models class UserBase(BaseModel): username: str email: EmailStr class UserCreate(UserBase): password: str @field_validator('password') @classmethod def password_strength(cls, v: str) -> str: if len(v) < 8: raise ValueError('Password must be at least 8 characters') if not any(char.isdigit() for char in v): raise ValueError('Password must contain at least one digit') if not any(char.isupper() for char in v): raise ValueError('Password must contain at least one uppercase letter') return v class UserResponse(UserBase): id: int is_active: bool created_at: datetime model_config = ConfigDict(from_attributes=True) class UserDetails(UserResponse): roles: List["RoleResponse"] = [] groups: List["GroupResponse"] = [] model_config = ConfigDict(from_attributes=True) # Role models class RoleBase(BaseModel): name: str description: Optional[str] = None class RoleCreate(RoleBase): pass class RoleResponse(RoleBase): id: int model_config = ConfigDict(from_attributes=True) class RoleDetails(RoleResponse): permissions: List["PermissionResponse"] = [] model_config = ConfigDict(from_attributes=True) # Permission models class PermissionBase(BaseModel): name: str resource: str action: str description: Optional[str] = None conditions: Optional[Dict[str, Any]] = None class PermissionCreate(PermissionBase): pass class PermissionResponse(PermissionBase): id: int model_config = ConfigDict(from_attributes=True) # Group models class GroupBase(BaseModel): name: str description: Optional[str] = None class GroupCreate(GroupBase): pass class GroupResponse(GroupBase): id: int model_config = ConfigDict(from_attributes=True) class GroupDetails(GroupResponse): users: List[UserResponse] = [] roles: List[RoleResponse] = [] model_config = ConfigDict(from_attributes=True) # Update models class UserUpdate(BaseModel): username: Optional[str] = None email: Optional[EmailStr] = None is_active: Optional[bool] = None class PasswordChange(BaseModel): current_password: str new_password: str @field_validator('new_password') @classmethod def password_strength(cls, v: str) -> str: if len(v) < 8: raise ValueError('Password must be at least 8 characters') if not any(char.isdigit() for char in v): raise ValueError('Password must contain at least one digit') if not any(char.isupper() for char in v): raise ValueError('Password must contain at least one uppercase letter') return v # Token models class Token(BaseModel): access_token: str token_type: str class TokenPayload(BaseModel): sub: Optional[str] = None exp: Optional[int] = None # Fix circular references UserDetails.model_rebuild() RoleDetails.model_rebuild() GroupDetails.model_rebuild() 
Enter fullscreen mode Exit fullscreen mode

Authentication and Authorization Utilities

from datetime import datetime, timedelta from typing import Any, Dict, Optional, Union from fastapi import Depends, HTTPException, status, Request from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select # Security configuration SECRET_KEY = "your-secret-key-here" # In production, use environment variables ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Database dependency async def get_async_db(): async with AsyncSessionLocal() as session: yield session # Password utilities def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) # JWT token functions def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # User authentication async def authenticate_user(db: AsyncSession, username: str, password: str) -> Union[User, bool]: stmt = select(User).where(User.username == username) result = await db.execute(stmt) user = result.scalars().first() if not user or not verify_password(password, user.password_hash): return False if not user.is_active: return False return user # Get current user from token async def get_current_user( db: AsyncSession = Depends(get_async_db), token: str = Depends(oauth2_scheme) ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: Optional[str] = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception stmt = select(User).where(User.username == username) result = await db.execute(stmt) user = result.scalars().first() if user is None: raise credentials_exception if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user # Get current active user async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user # ABAC Permission verification async def has_permission( user: User, resource: str, action: str, resource_id: Optional[int] = None, context: Optional[Dict[str, Any]] = None ) -> bool: """ Check if user has permission to perform an action on a resource. Implements Attribute-Based Access Control (ABAC) by considering: - Direct user permissions - Role-based permissions - Group-based permissions - Contextual conditions """ context = context or {} # Helper function to check conditions  def evaluate_conditions(conditions: Optional[Dict[str, Any]], context: Dict[str, Any]) -> bool: if not conditions: return True # Example condition: {"time_between": ["09:00", "17:00"]}  for condition_key, condition_value in conditions.items(): if condition_key == "time_between": current_time = context.get("current_time", datetime.now().time()) start_time = datetime.strptime(condition_value[0], "%H:%M").time() end_time = datetime.strptime(condition_value[1], "%H:%M").time() if not (start_time <= current_time <= end_time): return False elif condition_key == "ip_range": ip = context.get("ip_address") if not ip or ip not in condition_value: return False # Add more condition types as needed  return True # Check direct user permissions  for permission in user.direct_permissions: if (permission.resource == resource and permission.action == action and evaluate_conditions(permission.conditions, context)): return True # Check role-based permissions  for role in user.roles: for permission in role.permissions: if (permission.resource == resource and permission.action == action and evaluate_conditions(permission.conditions, context)): return True # Check group-based permissions (through roles)  for group in user.groups: for role in group.roles: for permission in role.permissions: if (permission.resource == resource and permission.action == action and evaluate_conditions(permission.conditions, context)): return True return False # Permission dependency for FastAPI routes def require_permission(resource: str, action: str): async def permission_dependency( current_user: User = Depends(get_current_active_user), request: Request = None ): context = { "current_time": datetime.now().time(), "ip_address": request.client.host if request else None } if not await has_permission(current_user, resource, action, context=context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission denied: {action} on {resource}" ) return current_user return permission_dependency # Audit logging utility async def create_audit_log( db: AsyncSession, user_id: Optional[int], action: str, resource_type: str, resource_id: Optional[int] = None, details: Optional[Dict[str, Any]] = None, ip_address: Optional[str] = None ) -> AuditLog: audit_log = AuditLog( user_id=user_id, action=action, resource_type=resource_type, resource_id=resource_id, details=details, ip_address=ip_address ) db.add(audit_log) await db.commit() await db.refresh(audit_log) return audit_log 
Enter fullscreen mode Exit fullscreen mode

FastAPI Endpoints with Modern Patterns

from fastapi import FastAPI, Depends, HTTPException, status, Request, Response, BackgroundTasks from fastapi.security import OAuth2PasswordRequestForm from fastapi.routing import APIRouter from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import IntegrityError from sqlalchemy import select, update, delete from typing import List, Optional from datetime import timedelta app = FastAPI( title="Permission Management API", description="Modern API for user permission management", version="2.0.0" ) # Routers for better organization auth_router = APIRouter(prefix="/auth", tags=["Authentication"]) user_router = APIRouter(prefix="/users", tags=["User Management"]) role_router = APIRouter(prefix="/roles", tags=["Role Management"]) permission_router = APIRouter(prefix="/permissions", tags=["Permission Management"]) group_router = APIRouter(prefix="/groups", tags=["Group Management"]) # Authentication endpoints @auth_router.post("/token", response_model=Token) async def login_for_access_token( background_tasks: BackgroundTasks, form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_async_db), request: Request = None ): user = await authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # Log successful login in background  background_tasks.add_task( create_audit_log, db=db, user_id=user.id, action="login", resource_type="auth", details={"success": True}, ip_address=request.client.host if request else None ) return {"access_token": access_token, "token_type": "bearer"} # User endpoints @user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def create_user( user: UserCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("users", "create")) ): try: hashed_password = get_password_hash(user.password) db_user = User( username=user.username, email=user.email, password_hash=hashed_password ) db.add(db_user) await db.commit() await db.refresh(db_user) # Log user creation in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="create", resource_type="users", resource_id=db_user.id, details={"username": user.username, "email": user.email}, ip_address=request.client.host if request else None ) return db_user except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username or email already exists" ) @user_router.get("/", response_model=List[UserResponse]) async def get_users( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("users", "list")) ): stmt = select(User).offset(skip).limit(limit) result = await db.execute(stmt) users = result.scalars().all() return users @user_router.get("/{user_id}", response_model=UserDetails) async def get_user( user_id: int, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("users", "read")) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @user_router.put("/{user_id}", response_model=UserResponse) async def update_user( user_id: int, user_update: UserUpdate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("users", "update")) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() if db_user is None: raise HTTPException(status_code=404, detail="User not found") update_data = user_update.model_dump(exclude_unset=True) try: if update_data: stmt = ( update(User) .where(User.id == user_id) .values(**update_data) ) await db.execute(stmt) await db.commit() # Refresh user data  stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() # Log user update in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="update", resource_type="users", resource_id=user_id, details=update_data, ip_address=request.client.host if request else None ) return db_user except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username or email already exists" ) @user_router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("users", "delete")) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() if db_user is None: raise HTTPException(status_code=404, detail="User not found") # Store username for audit log  username = db_user.username stmt = delete(User).where(User.id == user_id) await db.execute(stmt) await db.commit() # Log user deletion in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="delete", resource_type="users", resource_id=user_id, details={"username": username}, ip_address=request.client.host if request else None ) return Response(status_code=status.HTTP_204_NO_CONTENT) # Role endpoints @role_router.post("/", response_model=RoleResponse, status_code=status.HTTP_201_CREATED) async def create_role( role: RoleCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("roles", "create")) ): try: db_role = Role(**role.model_dump()) db.add(db_role) await db.commit() await db.refresh(db_role) # Log role creation in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="create", resource_type="roles", resource_id=db_role.id, details=role.model_dump(), ip_address=request.client.host if request else None ) return db_role except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Role name already exists" ) # Permission Router Implementation @permission_router.post("/", response_model=PermissionResponse, status_code=status.HTTP_201_CREATED) async def create_permission( permission: PermissionCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "create")) ): try: db_permission = Permission(**permission.model_dump()) db.add(db_permission) await db.commit() await db.refresh(db_permission) # Log permission creation in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="create", resource_type="permissions", resource_id=db_permission.id, details=permission.model_dump(), ip_address=request.client.host if request else None ) return db_permission except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Permission with this resource and action already exists" ) @permission_router.get("/", response_model=List[PermissionResponse]) async def get_permissions( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("permissions", "list")) ): stmt = select(Permission).offset(skip).limit(limit) result = await db.execute(stmt) permissions = result.scalars().all() return permissions @permission_router.get("/{permission_id}", response_model=PermissionResponse) async def get_permission( permission_id: int, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("permissions", "read")) ): stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() if db_permission is None: raise HTTPException(status_code=404, detail="Permission not found") return db_permission @permission_router.put("/{permission_id}", response_model=PermissionResponse) async def update_permission( permission_id: int, permission_update: PermissionCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "update")) ): stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() if db_permission is None: raise HTTPException(status_code=404, detail="Permission not found") update_data = permission_update.model_dump() try: stmt = ( update(Permission) .where(Permission.id == permission_id) .values(**update_data) ) await db.execute(stmt) await db.commit() # Refresh permission data  stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() # Log permission update in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="update", resource_type="permissions", resource_id=permission_id, details=update_data, ip_address=request.client.host if request else None ) return db_permission except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Permission with this resource and action already exists" ) @permission_router.delete("/{permission_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_permission( permission_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "delete")) ): stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() if db_permission is None: raise HTTPException(status_code=404, detail="Permission not found") # Store permission details for audit log  permission_details = { "name": db_permission.name, "resource": db_permission.resource, "action": db_permission.action } stmt = delete(Permission).where(Permission.id == permission_id) await db.execute(stmt) await db.commit() # Log permission deletion in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="delete", resource_type="permissions", resource_id=permission_id, details=permission_details, ip_address=request.client.host if request else None ) return Response(status_code=status.HTTP_204_NO_CONTENT) @permission_router.post("/{permission_id}/assign-to-role/{role_id}", status_code=status.HTTP_200_OK) async def assign_permission_to_role( permission_id: int, role_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "assign")) ): # Check if permission exists  permission_stmt = select(Permission).where(Permission.id == permission_id) permission_result = await db.execute(permission_stmt) permission = permission_result.scalars().first() if permission is None: raise HTTPException(status_code=404, detail="Permission not found") # Check if role exists  role_stmt = select(Role).where(Role.id == role_id) role_result = await db.execute(role_stmt) role = role_result.scalars().first() if role is None: raise HTTPException(status_code=404, detail="Role not found") # Add permission to role if not already assigned  if permission not in role.permissions: role.permissions.append(permission) await db.commit() # Log assignment in background  background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="assign", resource_type="permissions", resource_id=permission_id, details={"role_id": role_id, "permission_id": permission_id}, ip_address=request.client.host if request else None ) return {"message": f"Permission '{permission.name}' assigned to role '{role.name}'"} # Brief Group Router Implementation # Note: The Group Router follows patterns similar to the User and Role routers @group_router.post("/", response_model=GroupResponse, status_code=status.HTTP_201_CREATED) async def create_group( group: GroupCreate, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("groups", "create")) ): try: db_group = Group(**group.model_dump()) db.add(db_group) await db.commit() await db.refresh(db_group) return db_group except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Group name already exists" ) @group_router.get("/", response_model=List[GroupResponse]) async def get_groups( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("groups", "list")) ): stmt = select(Group).offset(skip).limit(limit) result = await db.execute(stmt) groups = result.scalars().all() return groups # Additional group endpoints would include: # - get_group(group_id) # - update_group(group_id, group_update) # - delete_group(group_id) # - add_user_to_group(group_id, user_id) # - remove_user_from_group(group_id, user_id) # - assign_role_to_group(group_id, role_id) # - remove_role_from_group(group_id, role_id) # These would follow similar patterns to the other routers  # Permission check endpoint (useful for frontend authorization) @auth_router.get("/check-permission") async def check_user_permission( resource: str, action: str, resource_id: Optional[int] = None, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(get_current_active_user), request: Request = None ): context = { "current_time": datetime.now().time(), "ip_address": request.client.host if request else None } has_access = await has_permission( user=current_user, resource=resource, action=action, resource_id=resource_id, context=context ) return {"has_permission": has_access} # Register routers app.include_router(auth_router) app.include_router(user_router) app.include_router(role_router) app.include_router(permission_router) app.include_router(group_router) # Lifespan for database setup @app.on_event("startup") async def startup(): await create_tables() 
Enter fullscreen mode Exit fullscreen mode

Implementation of Modern Permission Models

ABAC Implementation

Our permission system implements Attribute-Based Access Control through:

  1. Contextual Permission Checking: The has_permission function evaluates permissions based on:

    • User attributes (roles, group memberships)
    • Environmental factors (time of day, IP address)
    • Resource-specific attributes
  2. JSON Conditions:

{ "time_between": ["09:00", "17:00"], "ip_range": ["192.168.1.0/24", "10.0.0.0/8"], "department": "finance" } 
Enter fullscreen mode Exit fullscreen mode
  1. Dynamic Context Evaluation:
# Example usage in an API endpoint @app.get("/reports/{report_id}") async def get_report( report_id: int, request: Request, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(get_current_active_user) ): # Create context with current time and IP  context = { "current_time": datetime.now().time(), "ip_address": request.client.host, "department": current_user.department # Assuming user has department attribute  } # Check permission with context  if not await has_permission( user=current_user, resource="reports", action="read", resource_id=report_id, context=context ): raise HTTPException(status_code=403, detail="Permission denied") # Retrieve and return report... 
Enter fullscreen mode Exit fullscreen mode

Policy-Based Access Control

Our system implements PBAC through centralized permission policies:

  1. Hierarchical Permission Inheritance:

    • Users inherit permissions from roles
    • Roles can be assigned to groups
    • Groups can have multiple roles
  2. Policy Centralization:

    • Permissions are defined once and reused
    • Conditions are stored in the database
    • Changes to policies are immediately reflected

Performance Optimization

For large-scale applications, we've incorporated several performance enhancements:

  1. Async Database Operations: Using SQLAlchemy's async capabilities with asyncio

  2. Background Tasks: Moving audit logging to background tasks

  3. Efficient Queries: Using SQLAlchemy 2.0's more efficient query patterns

  4. Pydantic v2: Utilizing the significantly faster Pydantic v2 for validation

Security Best Practices

Our implementation incorporates several security best practices:

  1. Password Handling:

    • Secure hashing with bcrypt
    • Password strength validation
    • Separate password change endpoint
  2. API Security:

    • JWT authentication with expiration
    • CORS protection
    • Input validation
  3. Data Protection:

    • Separate request/response models
    • No sensitive data in responses
    • Audit logging for all sensitive operations

Conclusion

This article has demonstrated how to build a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. By following current best practices and incorporating recent advancements in these technologies, you can create a secure, scalable, and maintainable permission system.

Key advantages of this approach include:

  1. Type Safety: Using SQLAlchemy 2.0's typed attributes and Python's type hints
  2. Performance: Async operations and modern optimizations
  3. Flexibility: Supporting complex permission models like ABAC and PBAC
  4. Maintainability: Clear code organization with routers and dependency injection
  5. Security: Comprehensive security controls and audit logging

For production environments, consider these additional enhancements:

  • Implementing rate limiting
  • Adding two-factor authentication
  • Setting up permission caching for frequently checked permissions
  • Integrating with external identity providers like Auth0 or Okta

The code provided in this article serves as a solid foundation that you can extend and customize to meet your specific requirements.

Top comments (3)

Collapse
 
thomas_schuering_255364ab profile image
Thomas Schuering

Really nice article. Maybe the permission_router is missing? Otherwise a big thank you!

Collapse
 
mochafreddo profile image
Geoffrey Kim

Thank you for noting the missing permission_router implementation. I've updated the article to include the complete code for this router as well as a brief overview of the group_router. The article should now be more comprehensive.

Collapse
 
thomas_schuering_255364ab profile image
Thomas Schuering

I have one additional question: Why do the groups only have roles, but no permissions by themselves?