In this article, I'll guide you through designing and implementing a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. This comprehensive guide covers database schema design, contemporary permission models, and practical implementation using current best practices.
Recent Trends in Permission Models
While traditional Role-Based Access Control (RBAC) is still widely used, modern applications often require more sophisticated permission management approaches. Here are some key trends:
1. Attribute-Based Access Control (ABAC)
ABAC determines access rights based on user attributes, resource attributes, and environmental conditions. This approach enables highly flexible access control policies that can consider factors such as user department, resource sensitivity, and time-based restrictions.
2. Policy-Based Access Control (PBAC)
PBAC manages access through centralized policies that define specific conditions. These policies can include complex rules like "allow access to financial data only for finance department employees during business hours."
3. JSON-Based Permission Management
Storing permissions in JSON format allows for flexible, hierarchical structures that can represent complex permission sets and condition-based rules.
4. User Groups and Hierarchies
Incorporating user groups and hierarchical structures streamlines permission management by allowing administrators to assign permissions to entire groups rather than individual users.
5. Integration with External Authentication
Using standards like OAuth 2.0 and OpenID Connect enables Single Sign-On (SSO) and consistent authentication across multiple systems.
Database Design
Let's design a database schema that supports these modern approaches.
Core Tables
CREATE TABLE users ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, password_hash VARCHAR(255) NOT NULL, email VARCHAR(100) NOT NULL UNIQUE, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE roles ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(50) NOT NULL UNIQUE, description TEXT ); CREATE TABLE permissions ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL UNIQUE, resource VARCHAR(100) NOT NULL, action VARCHAR(50) NOT NULL, conditions JSON, description TEXT, UNIQUE KEY resource_action (resource, action) ); CREATE TABLE groups ( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(100) NOT NULL UNIQUE, description TEXT );
Relationship Tables
CREATE TABLE user_roles ( user_id INT, role_id INT, PRIMARY KEY (user_id, role_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE ); CREATE TABLE role_permissions ( role_id INT, permission_id INT, PRIMARY KEY (role_id, permission_id), FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE, FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE ); CREATE TABLE user_permissions ( user_id INT, permission_id INT, PRIMARY KEY (user_id, permission_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (permission_id) REFERENCES permissions(id) ON DELETE CASCADE ); CREATE TABLE user_groups ( user_id INT, group_id INT, PRIMARY KEY (user_id, group_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE ); CREATE TABLE group_roles ( group_id INT, role_id INT, PRIMARY KEY (group_id, role_id), FOREIGN KEY (group_id) REFERENCES groups(id) ON DELETE CASCADE, FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE ); CREATE TABLE audit_logs ( id INT PRIMARY KEY AUTO_INCREMENT, user_id INT, action VARCHAR(50) NOT NULL, resource_type VARCHAR(50) NOT NULL, resource_id INT, details JSON, ip_address VARCHAR(45), timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL );
Setting Up FastAPI and SQLAlchemy 2.0
Database Models with SQLAlchemy 2.0
Let's define our models using SQLAlchemy 2.0's latest patterns:
from datetime import datetime from typing import Any, Dict, List, Optional from sqlalchemy import ForeignKey, String, Boolean, JSON, Integer, Text, TIMESTAMP, func from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship # Base class for SQLAlchemy models using 2.0 style class Base(DeclarativeBase): pass # Association tables for many-to-many relationships from sqlalchemy import Table, Column user_roles = Table( 'user_roles', Base.metadata, Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')), Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')), ) role_permissions = Table( 'role_permissions', Base.metadata, Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')), Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')), ) user_permissions = Table( 'user_permissions', Base.metadata, Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')), Column('permission_id', Integer, ForeignKey('permissions.id', ondelete='CASCADE')), ) user_groups = Table( 'user_groups', Base.metadata, Column('user_id', Integer, ForeignKey('users.id', ondelete='CASCADE')), Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')), ) group_roles = Table( 'group_roles', Base.metadata, Column('group_id', Integer, ForeignKey('groups.id', ondelete='CASCADE')), Column('role_id', Integer, ForeignKey('roles.id', ondelete='CASCADE')), ) class User(Base): __tablename__ = 'users' id: Mapped[int] = mapped_column(primary_key=True) username: Mapped[str] = mapped_column(String(50), unique=True, index=True) password_hash: Mapped[str] = mapped_column(String(255)) email: Mapped[str] = mapped_column(String(100), unique=True, index=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True) created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now()) roles: Mapped[List["Role"]] = relationship(secondary=user_roles, back_populates="users") direct_permissions: Mapped[List["Permission"]] = relationship(secondary=user_permissions, back_populates="users") groups: Mapped[List["Group"]] = relationship(secondary=user_groups, back_populates="users") audit_logs: Mapped[List["AuditLog"]] = relationship(back_populates="user") class Role(Base): __tablename__ = 'roles' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(50), unique=True, index=True) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) users: Mapped[List["User"]] = relationship(secondary=user_roles, back_populates="roles") permissions: Mapped[List["Permission"]] = relationship(secondary=role_permissions, back_populates="roles") groups: Mapped[List["Group"]] = relationship(secondary=group_roles, back_populates="roles") class Permission(Base): __tablename__ = 'permissions' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True, index=True) resource: Mapped[str] = mapped_column(String(100)) action: Mapped[str] = mapped_column(String(50)) conditions: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) roles: Mapped[List["Role"]] = relationship(secondary=role_permissions, back_populates="permissions") users: Mapped[List["User"]] = relationship(secondary=user_permissions, back_populates="direct_permissions") __table_args__ = ( {'sqlite_autoincrement': True}, # For SQLite, if used ) class Group(Base): __tablename__ = 'groups' id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(100), unique=True, index=True) description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) users: Mapped[List["User"]] = relationship(secondary=user_groups, back_populates="groups") roles: Mapped[List["Role"]] = relationship(secondary=group_roles, back_populates="groups") class AuditLog(Base): __tablename__ = 'audit_logs' id: Mapped[int] = mapped_column(primary_key=True) user_id: Mapped[Optional[int]] = mapped_column(ForeignKey('users.id', ondelete='SET NULL'), nullable=True) action: Mapped[str] = mapped_column(String(50)) resource_type: Mapped[str] = mapped_column(String(50)) resource_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) details: Mapped[Optional[Dict[str, Any]]] = mapped_column(JSON, nullable=True) ip_address: Mapped[Optional[str]] = mapped_column(String(45), nullable=True) timestamp: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now()) user: Mapped[Optional["User"]] = relationship(back_populates="audit_logs")
Database Setup with Async Support
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker # For async database operations DATABASE_URL = "mariadb+aiomysql://user:password@localhost/dbname" async_engine = create_async_engine( DATABASE_URL, echo=True, future=True, ) AsyncSessionLocal = sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False ) # For sync operations (if needed) from sqlalchemy import create_engine from sqlalchemy.orm import Session SYNC_DATABASE_URL = "mariadb+pymysql://user:password@localhost/dbname" sync_engine = create_engine( SYNC_DATABASE_URL, echo=True, future=True, ) SyncSessionLocal = sessionmaker( sync_engine, class_=Session, expire_on_commit=False ) # Creating tables async def create_tables(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
Pydantic Models with Pydantic v2
from pydantic import BaseModel, EmailStr, field_validator, Field, ConfigDict from datetime import datetime from typing import Dict, Any, List, Optional # User models class UserBase(BaseModel): username: str email: EmailStr class UserCreate(UserBase): password: str @field_validator('password') @classmethod def password_strength(cls, v: str) -> str: if len(v) < 8: raise ValueError('Password must be at least 8 characters') if not any(char.isdigit() for char in v): raise ValueError('Password must contain at least one digit') if not any(char.isupper() for char in v): raise ValueError('Password must contain at least one uppercase letter') return v class UserResponse(UserBase): id: int is_active: bool created_at: datetime model_config = ConfigDict(from_attributes=True) class UserDetails(UserResponse): roles: List["RoleResponse"] = [] groups: List["GroupResponse"] = [] model_config = ConfigDict(from_attributes=True) # Role models class RoleBase(BaseModel): name: str description: Optional[str] = None class RoleCreate(RoleBase): pass class RoleResponse(RoleBase): id: int model_config = ConfigDict(from_attributes=True) class RoleDetails(RoleResponse): permissions: List["PermissionResponse"] = [] model_config = ConfigDict(from_attributes=True) # Permission models class PermissionBase(BaseModel): name: str resource: str action: str description: Optional[str] = None conditions: Optional[Dict[str, Any]] = None class PermissionCreate(PermissionBase): pass class PermissionResponse(PermissionBase): id: int model_config = ConfigDict(from_attributes=True) # Group models class GroupBase(BaseModel): name: str description: Optional[str] = None class GroupCreate(GroupBase): pass class GroupResponse(GroupBase): id: int model_config = ConfigDict(from_attributes=True) class GroupDetails(GroupResponse): users: List[UserResponse] = [] roles: List[RoleResponse] = [] model_config = ConfigDict(from_attributes=True) # Update models class UserUpdate(BaseModel): username: Optional[str] = None email: Optional[EmailStr] = None is_active: Optional[bool] = None class PasswordChange(BaseModel): current_password: str new_password: str @field_validator('new_password') @classmethod def password_strength(cls, v: str) -> str: if len(v) < 8: raise ValueError('Password must be at least 8 characters') if not any(char.isdigit() for char in v): raise ValueError('Password must contain at least one digit') if not any(char.isupper() for char in v): raise ValueError('Password must contain at least one uppercase letter') return v # Token models class Token(BaseModel): access_token: str token_type: str class TokenPayload(BaseModel): sub: Optional[str] = None exp: Optional[int] = None # Fix circular references UserDetails.model_rebuild() RoleDetails.model_rebuild() GroupDetails.model_rebuild()
Authentication and Authorization Utilities
from datetime import datetime, timedelta from typing import Any, Dict, Optional, Union from fastapi import Depends, HTTPException, status, Request from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select # Security configuration SECRET_KEY = "your-secret-key-here" # In production, use environment variables ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Database dependency async def get_async_db(): async with AsyncSessionLocal() as session: yield session # Password utilities def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) # JWT token functions def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt # User authentication async def authenticate_user(db: AsyncSession, username: str, password: str) -> Union[User, bool]: stmt = select(User).where(User.username == username) result = await db.execute(stmt) user = result.scalars().first() if not user or not verify_password(password, user.password_hash): return False if not user.is_active: return False return user # Get current user from token async def get_current_user( db: AsyncSession = Depends(get_async_db), token: str = Depends(oauth2_scheme) ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: Optional[str] = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception stmt = select(User).where(User.username == username) result = await db.execute(stmt) user = result.scalars().first() if user is None: raise credentials_exception if not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return user # Get current active user async def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user # ABAC Permission verification async def has_permission( user: User, resource: str, action: str, resource_id: Optional[int] = None, context: Optional[Dict[str, Any]] = None ) -> bool: """ Check if user has permission to perform an action on a resource. Implements Attribute-Based Access Control (ABAC) by considering: - Direct user permissions - Role-based permissions - Group-based permissions - Contextual conditions """ context = context or {} # Helper function to check conditions def evaluate_conditions(conditions: Optional[Dict[str, Any]], context: Dict[str, Any]) -> bool: if not conditions: return True # Example condition: {"time_between": ["09:00", "17:00"]} for condition_key, condition_value in conditions.items(): if condition_key == "time_between": current_time = context.get("current_time", datetime.now().time()) start_time = datetime.strptime(condition_value[0], "%H:%M").time() end_time = datetime.strptime(condition_value[1], "%H:%M").time() if not (start_time <= current_time <= end_time): return False elif condition_key == "ip_range": ip = context.get("ip_address") if not ip or ip not in condition_value: return False # Add more condition types as needed return True # Check direct user permissions for permission in user.direct_permissions: if (permission.resource == resource and permission.action == action and evaluate_conditions(permission.conditions, context)): return True # Check role-based permissions for role in user.roles: for permission in role.permissions: if (permission.resource == resource and permission.action == action and evaluate_conditions(permission.conditions, context)): return True # Check group-based permissions (through roles) for group in user.groups: for role in group.roles: for permission in role.permissions: if (permission.resource == resource and permission.action == action and evaluate_conditions(permission.conditions, context)): return True return False # Permission dependency for FastAPI routes def require_permission(resource: str, action: str): async def permission_dependency( current_user: User = Depends(get_current_active_user), request: Request = None ): context = { "current_time": datetime.now().time(), "ip_address": request.client.host if request else None } if not await has_permission(current_user, resource, action, context=context): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Permission denied: {action} on {resource}" ) return current_user return permission_dependency # Audit logging utility async def create_audit_log( db: AsyncSession, user_id: Optional[int], action: str, resource_type: str, resource_id: Optional[int] = None, details: Optional[Dict[str, Any]] = None, ip_address: Optional[str] = None ) -> AuditLog: audit_log = AuditLog( user_id=user_id, action=action, resource_type=resource_type, resource_id=resource_id, details=details, ip_address=ip_address ) db.add(audit_log) await db.commit() await db.refresh(audit_log) return audit_log
FastAPI Endpoints with Modern Patterns
from fastapi import FastAPI, Depends, HTTPException, status, Request, Response, BackgroundTasks from fastapi.security import OAuth2PasswordRequestForm from fastapi.routing import APIRouter from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import IntegrityError from sqlalchemy import select, update, delete from typing import List, Optional from datetime import timedelta app = FastAPI( title="Permission Management API", description="Modern API for user permission management", version="2.0.0" ) # Routers for better organization auth_router = APIRouter(prefix="/auth", tags=["Authentication"]) user_router = APIRouter(prefix="/users", tags=["User Management"]) role_router = APIRouter(prefix="/roles", tags=["Role Management"]) permission_router = APIRouter(prefix="/permissions", tags=["Permission Management"]) group_router = APIRouter(prefix="/groups", tags=["Group Management"]) # Authentication endpoints @auth_router.post("/token", response_model=Token) async def login_for_access_token( background_tasks: BackgroundTasks, form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_async_db), request: Request = None ): user = await authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) # Log successful login in background background_tasks.add_task( create_audit_log, db=db, user_id=user.id, action="login", resource_type="auth", details={"success": True}, ip_address=request.client.host if request else None ) return {"access_token": access_token, "token_type": "bearer"} # User endpoints @user_router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def create_user( user: UserCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("users", "create")) ): try: hashed_password = get_password_hash(user.password) db_user = User( username=user.username, email=user.email, password_hash=hashed_password ) db.add(db_user) await db.commit() await db.refresh(db_user) # Log user creation in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="create", resource_type="users", resource_id=db_user.id, details={"username": user.username, "email": user.email}, ip_address=request.client.host if request else None ) return db_user except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username or email already exists" ) @user_router.get("/", response_model=List[UserResponse]) async def get_users( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("users", "list")) ): stmt = select(User).offset(skip).limit(limit) result = await db.execute(stmt) users = result.scalars().all() return users @user_router.get("/{user_id}", response_model=UserDetails) async def get_user( user_id: int, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("users", "read")) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() if db_user is None: raise HTTPException(status_code=404, detail="User not found") return db_user @user_router.put("/{user_id}", response_model=UserResponse) async def update_user( user_id: int, user_update: UserUpdate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("users", "update")) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() if db_user is None: raise HTTPException(status_code=404, detail="User not found") update_data = user_update.model_dump(exclude_unset=True) try: if update_data: stmt = ( update(User) .where(User.id == user_id) .values(**update_data) ) await db.execute(stmt) await db.commit() # Refresh user data stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() # Log user update in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="update", resource_type="users", resource_id=user_id, details=update_data, ip_address=request.client.host if request else None ) return db_user except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username or email already exists" ) @user_router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("users", "delete")) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalars().first() if db_user is None: raise HTTPException(status_code=404, detail="User not found") # Store username for audit log username = db_user.username stmt = delete(User).where(User.id == user_id) await db.execute(stmt) await db.commit() # Log user deletion in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="delete", resource_type="users", resource_id=user_id, details={"username": username}, ip_address=request.client.host if request else None ) return Response(status_code=status.HTTP_204_NO_CONTENT) # Role endpoints @role_router.post("/", response_model=RoleResponse, status_code=status.HTTP_201_CREATED) async def create_role( role: RoleCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("roles", "create")) ): try: db_role = Role(**role.model_dump()) db.add(db_role) await db.commit() await db.refresh(db_role) # Log role creation in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="create", resource_type="roles", resource_id=db_role.id, details=role.model_dump(), ip_address=request.client.host if request else None ) return db_role except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Role name already exists" ) # Permission Router Implementation @permission_router.post("/", response_model=PermissionResponse, status_code=status.HTTP_201_CREATED) async def create_permission( permission: PermissionCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "create")) ): try: db_permission = Permission(**permission.model_dump()) db.add(db_permission) await db.commit() await db.refresh(db_permission) # Log permission creation in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="create", resource_type="permissions", resource_id=db_permission.id, details=permission.model_dump(), ip_address=request.client.host if request else None ) return db_permission except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Permission with this resource and action already exists" ) @permission_router.get("/", response_model=List[PermissionResponse]) async def get_permissions( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("permissions", "list")) ): stmt = select(Permission).offset(skip).limit(limit) result = await db.execute(stmt) permissions = result.scalars().all() return permissions @permission_router.get("/{permission_id}", response_model=PermissionResponse) async def get_permission( permission_id: int, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("permissions", "read")) ): stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() if db_permission is None: raise HTTPException(status_code=404, detail="Permission not found") return db_permission @permission_router.put("/{permission_id}", response_model=PermissionResponse) async def update_permission( permission_id: int, permission_update: PermissionCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "update")) ): stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() if db_permission is None: raise HTTPException(status_code=404, detail="Permission not found") update_data = permission_update.model_dump() try: stmt = ( update(Permission) .where(Permission.id == permission_id) .values(**update_data) ) await db.execute(stmt) await db.commit() # Refresh permission data stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() # Log permission update in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="update", resource_type="permissions", resource_id=permission_id, details=update_data, ip_address=request.client.host if request else None ) return db_permission except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Permission with this resource and action already exists" ) @permission_router.delete("/{permission_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_permission( permission_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "delete")) ): stmt = select(Permission).where(Permission.id == permission_id) result = await db.execute(stmt) db_permission = result.scalars().first() if db_permission is None: raise HTTPException(status_code=404, detail="Permission not found") # Store permission details for audit log permission_details = { "name": db_permission.name, "resource": db_permission.resource, "action": db_permission.action } stmt = delete(Permission).where(Permission.id == permission_id) await db.execute(stmt) await db.commit() # Log permission deletion in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="delete", resource_type="permissions", resource_id=permission_id, details=permission_details, ip_address=request.client.host if request else None ) return Response(status_code=status.HTTP_204_NO_CONTENT) @permission_router.post("/{permission_id}/assign-to-role/{role_id}", status_code=status.HTTP_200_OK) async def assign_permission_to_role( permission_id: int, role_id: int, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_async_db), request: Request = None, current_user: User = Depends(require_permission("permissions", "assign")) ): # Check if permission exists permission_stmt = select(Permission).where(Permission.id == permission_id) permission_result = await db.execute(permission_stmt) permission = permission_result.scalars().first() if permission is None: raise HTTPException(status_code=404, detail="Permission not found") # Check if role exists role_stmt = select(Role).where(Role.id == role_id) role_result = await db.execute(role_stmt) role = role_result.scalars().first() if role is None: raise HTTPException(status_code=404, detail="Role not found") # Add permission to role if not already assigned if permission not in role.permissions: role.permissions.append(permission) await db.commit() # Log assignment in background background_tasks.add_task( create_audit_log, db=db, user_id=current_user.id, action="assign", resource_type="permissions", resource_id=permission_id, details={"role_id": role_id, "permission_id": permission_id}, ip_address=request.client.host if request else None ) return {"message": f"Permission '{permission.name}' assigned to role '{role.name}'"} # Brief Group Router Implementation # Note: The Group Router follows patterns similar to the User and Role routers @group_router.post("/", response_model=GroupResponse, status_code=status.HTTP_201_CREATED) async def create_group( group: GroupCreate, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("groups", "create")) ): try: db_group = Group(**group.model_dump()) db.add(db_group) await db.commit() await db.refresh(db_group) return db_group except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Group name already exists" ) @group_router.get("/", response_model=List[GroupResponse]) async def get_groups( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(require_permission("groups", "list")) ): stmt = select(Group).offset(skip).limit(limit) result = await db.execute(stmt) groups = result.scalars().all() return groups # Additional group endpoints would include: # - get_group(group_id) # - update_group(group_id, group_update) # - delete_group(group_id) # - add_user_to_group(group_id, user_id) # - remove_user_from_group(group_id, user_id) # - assign_role_to_group(group_id, role_id) # - remove_role_from_group(group_id, role_id) # These would follow similar patterns to the other routers # Permission check endpoint (useful for frontend authorization) @auth_router.get("/check-permission") async def check_user_permission( resource: str, action: str, resource_id: Optional[int] = None, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(get_current_active_user), request: Request = None ): context = { "current_time": datetime.now().time(), "ip_address": request.client.host if request else None } has_access = await has_permission( user=current_user, resource=resource, action=action, resource_id=resource_id, context=context ) return {"has_permission": has_access} # Register routers app.include_router(auth_router) app.include_router(user_router) app.include_router(role_router) app.include_router(permission_router) app.include_router(group_router) # Lifespan for database setup @app.on_event("startup") async def startup(): await create_tables()
Implementation of Modern Permission Models
ABAC Implementation
Our permission system implements Attribute-Based Access Control through:
-
Contextual Permission Checking: The
has_permission
function evaluates permissions based on:- User attributes (roles, group memberships)
- Environmental factors (time of day, IP address)
- Resource-specific attributes
JSON Conditions:
{ "time_between": ["09:00", "17:00"], "ip_range": ["192.168.1.0/24", "10.0.0.0/8"], "department": "finance" }
- Dynamic Context Evaluation:
# Example usage in an API endpoint @app.get("/reports/{report_id}") async def get_report( report_id: int, request: Request, db: AsyncSession = Depends(get_async_db), current_user: User = Depends(get_current_active_user) ): # Create context with current time and IP context = { "current_time": datetime.now().time(), "ip_address": request.client.host, "department": current_user.department # Assuming user has department attribute } # Check permission with context if not await has_permission( user=current_user, resource="reports", action="read", resource_id=report_id, context=context ): raise HTTPException(status_code=403, detail="Permission denied") # Retrieve and return report...
Policy-Based Access Control
Our system implements PBAC through centralized permission policies:
-
Hierarchical Permission Inheritance:
- Users inherit permissions from roles
- Roles can be assigned to groups
- Groups can have multiple roles
-
Policy Centralization:
- Permissions are defined once and reused
- Conditions are stored in the database
- Changes to policies are immediately reflected
Performance Optimization
For large-scale applications, we've incorporated several performance enhancements:
Async Database Operations: Using SQLAlchemy's async capabilities with
asyncio
Background Tasks: Moving audit logging to background tasks
Efficient Queries: Using SQLAlchemy 2.0's more efficient query patterns
Pydantic v2: Utilizing the significantly faster Pydantic v2 for validation
Security Best Practices
Our implementation incorporates several security best practices:
-
Password Handling:
- Secure hashing with bcrypt
- Password strength validation
- Separate password change endpoint
-
API Security:
- JWT authentication with expiration
- CORS protection
- Input validation
-
Data Protection:
- Separate request/response models
- No sensitive data in responses
- Audit logging for all sensitive operations
Conclusion
This article has demonstrated how to build a modern user permission management system using the latest versions of FastAPI, SQLAlchemy 2.0, and MariaDB. By following current best practices and incorporating recent advancements in these technologies, you can create a secure, scalable, and maintainable permission system.
Key advantages of this approach include:
- Type Safety: Using SQLAlchemy 2.0's typed attributes and Python's type hints
- Performance: Async operations and modern optimizations
- Flexibility: Supporting complex permission models like ABAC and PBAC
- Maintainability: Clear code organization with routers and dependency injection
- Security: Comprehensive security controls and audit logging
For production environments, consider these additional enhancements:
- Implementing rate limiting
- Adding two-factor authentication
- Setting up permission caching for frequently checked permissions
- Integrating with external identity providers like Auth0 or Okta
The code provided in this article serves as a solid foundation that you can extend and customize to meet your specific requirements.
Top comments (3)
Really nice article. Maybe the permission_router is missing? Otherwise a big thank you!
Thank you for noting the missing
permission_router
implementation. I've updated the article to include the complete code for this router as well as a brief overview of thegroup_router
. The article should now be more comprehensive.I have one additional question: Why do the groups only have roles, but no permissions by themselves?