As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Python design patterns provide essential solutions for building resilient distributed systems and microservices. As a Python architect, I've implemented these patterns across numerous projects and found them invaluable for creating maintainable, scalable systems. Let me share eight critical patterns that will transform your distributed architecture.
Circuit Breaker Pattern
The Circuit Breaker pattern prevents cascading failures in distributed systems. When a service becomes unavailable, the circuit breaker stops sending requests, allowing the failing component time to recover.
I've implemented this pattern numerous times, particularly when integrating with third-party APIs that occasionally experience downtime:
import time import functools from enum import Enum class CircuitState(Enum): CLOSED = 'closed' OPEN = 'open' HALF_OPEN = 'half_open' class CircuitBreaker: def __init__(self, failure_threshold=5, recovery_timeout=30, name="default"): self.name = name self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kwargs): if self.state == CircuitState.OPEN: if time.time() - self.last_failure_time > self.recovery_timeout: self.state = CircuitState.HALF_OPEN print(f"Circuit {self.name} switching to HALF_OPEN") else: print(f"Circuit {self.name} is OPEN - fast failing") raise CircuitBreakerError(f"Circuit {self.name} is open") try: result = func(*args, **kwargs) # Success - reset if in half-open state if self.state == CircuitState.HALF_OPEN: self.reset() print(f"Circuit {self.name} reset to CLOSED after success") return result except Exception as e: # Handle failures self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN print(f"Circuit {self.name} tripped to OPEN after {self.failure_count} failures") raise e return wrapper def reset(self): self.failure_count = 0 self.state = CircuitState.CLOSED self.last_failure_time = None class CircuitBreakerError(Exception): pass
This implementation includes state transitions (CLOSED, OPEN, HALF-OPEN) and automatic recovery attempts. I use this as a decorator for service functions:
@CircuitBreaker(failure_threshold=3, recovery_timeout=10, name="payment_service") def process_payment(payment_data): # External API call that might fail response = requests.post("https://payment.example.com/process", json=payment_data) response.raise_for_status() return response.json()
Saga Pattern
The Saga pattern helps maintain data consistency across microservices without distributed transactions. Each step in a business process has a corresponding compensation action to undo changes if a step fails.
Here's my implementation of a saga coordinator:
import uuid from enum import Enum from typing import Dict, List, Callable, Any class SagaStatus(Enum): STARTED = "started" COMPLETED = "completed" FAILED = "failed" class SagaStep: def __init__(self, action, compensation): self.action = action self.compensation = compensation self.executed = False class Saga: def __init__(self, name: str): self.name = name self.saga_id = str(uuid.uuid4()) self.steps: List[SagaStep] = [] self.context: Dict[str, Any] = {} self.status = SagaStatus.STARTED def add_step(self, action: Callable, compensation: Callable): """Add a step to the saga with its compensation action""" self.steps.append(SagaStep(action, compensation)) return self def execute(self): """Execute the saga with compensation on failure""" executed_steps = [] try: # Execute each step in sequence for step in self.steps: step.action(self.context) step.executed = True executed_steps.append(step) self.status = SagaStatus.COMPLETED return self.context except Exception as e: self.status = SagaStatus.FAILED # Compensate in reverse order for step in reversed(executed_steps): try: step.compensation(self.context) except Exception as comp_error: # Log compensation error but continue with other compensations print(f"Compensation error in {self.name}: {comp_error}") raise SagaExecutionError(f"Saga {self.name} failed: {str(e)}") class SagaExecutionError(Exception): pass
I've used this pattern for order processing workflows:
def create_order(context): # Create order in order service order_id = order_service.create_order(context["customer_id"], context["items"]) context["order_id"] = order_id def delete_order(context): # Compensation: delete the order order_service.delete_order(context["order_id"]) def reserve_inventory(context): # Reserve inventory items inventory_service.reserve_items(context["items"]) def release_inventory(context): # Compensation: release reserved inventory inventory_service.release_items(context["items"]) def process_payment(context): # Process payment payment_id = payment_service.charge(context["customer_id"], context["total"]) context["payment_id"] = payment_id def refund_payment(context): # Compensation: refund payment payment_service.refund(context["payment_id"]) # Create and execute the saga order_saga = Saga("order_processing") order_saga.add_step(create_order, delete_order) order_saga.add_step(reserve_inventory, release_inventory) order_saga.add_step(process_payment, refund_payment) try: result = order_saga.execute() print(f"Order completed successfully: {result}") except SagaExecutionError as e: print(f"Order failed with compensation: {e}")
Bulkhead Pattern
The Bulkhead pattern isolates components to prevent failures from cascading through the system. I implement this using thread pools and resource limits:
import concurrent.futures from dataclasses import dataclass @dataclass class Bulkhead: name: str max_concurrent_calls: int max_queue_size: int def __post_init__(self): self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=self.max_concurrent_calls, thread_name_prefix=f"bulkhead-{self.name}" ) self._semaphore = concurrent.futures.Semaphore( self.max_concurrent_calls + self.max_queue_size ) def execute(self, fn, *args, **kwargs): if not self._semaphore.acquire(blocking=False): raise BulkheadFullError(f"Bulkhead {self.name} is full") try: future = self._executor.submit(fn, *args, **kwargs) future.add_done_callback(lambda _: self._semaphore.release()) return future except Exception: self._semaphore.release() raise def shutdown(self, wait=True): self._executor.shutdown(wait=wait) class BulkheadFullError(Exception): pass
I apply this pattern to isolate critical services:
# Create bulkheads for different services database_bulkhead = Bulkhead("database", max_concurrent_calls=10, max_queue_size=20) payment_bulkhead = Bulkhead("payment", max_concurrent_calls=5, max_queue_size=10) email_bulkhead = Bulkhead("email", max_concurrent_calls=20, max_queue_size=50) # Use the bulkheads try: # Database operations with limited concurrency future = database_bulkhead.execute(db_service.query, "SELECT * FROM users") result = future.result(timeout=2.0) # Set timeout to avoid waiting forever # Payment processing with isolation payment_future = payment_bulkhead.execute( payment_service.process_payment, user_id=123, amount=99.99 ) payment_result = payment_future.result(timeout=5.0) except BulkheadFullError as e: # Handle overload - perhaps retry later or return degraded response print(f"Service overloaded: {e}") except concurrent.futures.TimeoutError: # Handle timeout print("Operation timed out")
CQRS Pattern
The Command Query Responsibility Segregation pattern separates read and write operations. This allows for optimizing each pathway independently.
Here's a simplified implementation:
from abc import ABC, abstractmethod from typing import Dict, List, Any # Command side class Command(ABC): pass class CommandHandler(ABC): @abstractmethod def handle(self, command: Command) -> None: pass class CreateOrderCommand(Command): def __init__(self, order_id: str, customer_id: str, items: List[Dict]): self.order_id = order_id self.customer_id = customer_id self.items = items class CreateOrderHandler(CommandHandler): def __init__(self, repository): self.repository = repository def handle(self, command: CreateOrderCommand) -> None: # Business logic for creating an order order = { 'order_id': command.order_id, 'customer_id': command.customer_id, 'items': command.items, 'status': 'created' } # Store in write model self.repository.save(order) # Publish event for read model sync self.repository.publish_event('order_created', order) # Query side class Query(ABC): pass class QueryHandler(ABC): @abstractmethod def handle(self, query: Query) -> Any: pass class GetOrderQuery(Query): def __init__(self, order_id: str): self.order_id = order_id class GetOrderHandler(QueryHandler): def __init__(self, read_repository): self.read_repository = read_repository def handle(self, query: GetOrderQuery) -> Dict: # Fetch from optimized read model return self.read_repository.get_order(query.order_id) # Command bus class CommandBus: def __init__(self): self.handlers = {} def register(self, command_type, handler): self.handlers[command_type] = handler def dispatch(self, command): handler = self.handlers.get(type(command)) if not handler: raise ValueError(f"No handler registered for {type(command)}") return handler.handle(command) # Query bus class QueryBus: def __init__(self): self.handlers = {} def register(self, query_type, handler): self.handlers[query_type] = handler def dispatch(self, query): handler = self.handlers.get(type(query)) if not handler: raise ValueError(f"No handler registered for {type(query)}") return handler.handle(query)
In my projects, I use CQRS when reading and writing needs differ significantly:
# Setup command_bus = CommandBus() query_bus = QueryBus() # Register command handlers command_bus.register(CreateOrderCommand, CreateOrderHandler(write_repository)) # Register query handlers query_bus.register(GetOrderQuery, GetOrderHandler(read_repository)) # Client code - Command side def create_order_endpoint(data): command = CreateOrderCommand( order_id=str(uuid.uuid4()), customer_id=data['customer_id'], items=data['items'] ) command_bus.dispatch(command) return {"status": "order created", "order_id": command.order_id} # Client code - Query side def get_order_endpoint(order_id): query = GetOrderQuery(order_id) result = query_bus.dispatch(query) return result
API Gateway Pattern
The API Gateway pattern centralizes cross-cutting concerns and provides a single entry point to microservices. I've implemented lightweight gateways using FastAPI:
from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.security import OAuth2PasswordBearer import httpx import jwt import time app = FastAPI(title="API Gateway") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Service registry (could be dynamic with service discovery) SERVICES = { "users": "http://user-service:8001", "orders": "http://order-service:8002", "products": "http://product-service:8003", } JWT_SECRET = "your-secret-key" # In production, use secure environment variables # Authentication middleware async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=401, detail="Invalid token") return {"user_id": user_id} except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid token") # Rate limiting middleware class RateLimiter: def __init__(self, requests_per_minute=60): self.requests_per_minute = requests_per_minute self.requests = {} async def __call__(self, request: Request): client_ip = request.client.host current_time = time.time() # Clean old entries self.requests = {ip: times for ip, times in self.requests.items() if current_time - times[-1] < 60} # Get client's request times request_times = self.requests.get(client_ip, []) # Remove requests older than 1 minute request_times = [t for t in request_times if current_time - t < 60] if len(request_times) >= self.requests_per_minute: raise HTTPException(status_code=429, detail="Rate limit exceeded") # Add current request request_times.append(current_time) self.requests[client_ip] = request_times rate_limiter = RateLimiter(requests_per_minute=100) # Service proxy with timeout async def proxy_to_service(service: str, path: str, request: Request): if service not in SERVICES: raise HTTPException(status_code=404, detail=f"Service {service} not found") # Get target URL target_url = f"{SERVICES[service]}{path}" # Forward method, headers, and body method = request.method headers = dict(request.headers) headers.pop("host", None) # Remove host header body = await request.body() # Use httpx for async HTTP requests async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.request( method=method, url=target_url, headers=headers, content=body, ) return { "status_code": response.status_code, "content": response.json() if response.headers.get("content-type") == "application/json" else response.text, "headers": dict(response.headers) } except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Service timeout") except Exception as e: raise HTTPException(status_code=502, detail=f"Error: {str(e)}") # API Gateway routes @app.get("/") async def root(): return {"message": "API Gateway"} @app.get("/health") async def health(): return {"status": "healthy", "timestamp": time.time()} # Public routes (no auth) @app.get("/public/{service}{path:path}") async def public_route(service: str, path: str, request: Request): await rate_limiter(request) return await proxy_to_service(service, path, request) # Protected routes (with auth) @app.get("/api/{service}{path:path}") async def protected_route( service: str, path: str, request: Request, user: dict = Depends(get_current_user) ): await rate_limiter(request) return await proxy_to_service(service, path, request)
Event Sourcing Pattern
The Event Sourcing pattern stores state changes as a sequence of events. This enables powerful event-driven communication between services.
Here's my implementation:
import json import time import uuid from typing import Dict, List, Any, Optional, Callable class Event: def __init__(self, event_type: str, data: Dict[str, Any], aggregate_id: str, version: int): self.id = str(uuid.uuid4()) self.event_type = event_type self.data = data self.aggregate_id = aggregate_id self.version = version self.timestamp = time.time() def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "event_type": self.event_type, "data": self.data, "aggregate_id": self.aggregate_id, "version": self.version, "timestamp": self.timestamp } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Event": event = cls( event_type=data["event_type"], data=data["data"], aggregate_id=data["aggregate_id"], version=data["version"] ) event.id = data["id"] event.timestamp = data["timestamp"] return event class EventStore: def __init__(self): self.events: Dict[str, List[Event]] = {} self.subscribers: Dict[str, List[Callable]] = {} def save_event(self, event: Event) -> None: """Save an event to the store and publish it to subscribers""" if event.aggregate_id not in self.events: self.events[event.aggregate_id] = [] # Check for version conflicts events = self.events[event.aggregate_id] if events and events[-1].version >= event.version: raise ValueError(f"Version conflict for {event.aggregate_id}") # Store the event self.events[event.aggregate_id].append(event) # Publish the event self._publish(event) def get_events(self, aggregate_id: str, since_version: int = 0) -> List[Event]: """Get all events for an aggregate ID since a specific version""" events = self.events.get(aggregate_id, []) return [e for e in events if e.version > since_version] def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None: """Subscribe to events of a specific type""" if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(callback) def _publish(self, event: Event) -> None: """Publish event to subscribers""" subscribers = self.subscribers.get(event.event_type, []) for subscriber in subscribers: try: subscriber(event) except Exception as e: # In production, log this error but don't stop processing print(f"Error in subscriber: {e}") class Aggregate: """Base class for aggregates that use event sourcing""" def __init__(self, id: str, event_store: EventStore): self.id = id self.version = 0 self._event_store = event_store self._changes: List[Event] = [] def load_from_history(self) -> None: """Load the aggregate state from its event history""" events = self._event_store.get_events(self.id) for event in events: self._apply_event(event) self.version = event.version def apply_event(self, event_type: str, data: Dict[str, Any]) -> None: """Apply a new event to the aggregate""" event = Event( event_type=event_type, data=data, aggregate_id=self.id, version=self.version + 1 ) # Apply event to update current state self._apply_event(event) # Track the change to be committed self._changes.append(event) self.version += 1 def commit(self) -> None: """Commit all pending changes to the event store""" for event in self._changes: self._event_store.save_event(event) self._changes = [] def _apply_event(self, event: Event) -> None: """Apply an event to the aggregate - override in subclasses""" method_name = f"apply_{event.event_type}" method = getattr(self, method_name, None) if method: method(event.data)
Here's an order management example using event sourcing:
class Order(Aggregate): def __init__(self, id: str, event_store: EventStore): super().__init__(id, event_store) self.status = "new" self.items = [] self.customer_id = None self.total = 0 def create(self, customer_id: str) -> None: self.apply_event("order_created", { "customer_id": customer_id }) def add_item(self, product_id: str, quantity: int, price: float) -> None: if self.status != "new": raise ValueError("Cannot add items to non-new order") self.apply_event("item_added", { "product_id": product_id, "quantity": quantity, "price": price }) def submit(self) -> None: if not self.items: raise ValueError("Cannot submit empty order") self.apply_event("order_submitted", {}) def apply_order_created(self, data: Dict[str, Any]) -> None: self.customer_id = data["customer_id"] self.status = "new" def apply_item_added(self, data: Dict[str, Any]) -> None: item = { "product_id": data["product_id"], "quantity": data["quantity"], "price": data["price"] } self.items.append(item) self.total += data["quantity"] * data["price"] def apply_order_submitted(self, data: Dict[str, Any]) -> None: self.status = "submitted" # Using the event sourcing system event_store = EventStore() # Create a new order order = Order("order-123", event_store) order.create("customer-456") order.add_item("product-789", 2, 10.99) order.add_item("product-101", 1, 24.99) order.submit() order.commit() # Later, reconstruct the order state loaded_order = Order("order-123", event_store) loaded_order.load_from_history() print(f"Order total: ${loaded_order.total}") print(f"Order status: {loaded_order.status}") print(f"Items count: {len(loaded_order.items)}")
Health Check Pattern
The Health Check pattern implements monitoring endpoints to report service status:
import time import asyncio import threading from enum import Enum from typing import Dict, List, Callable, Any, Optional from dataclasses import dataclass, field class HealthStatus(Enum): UP = "UP" DOWN = "DOWN" DEGRADED = "DEGRADED" UNKNOWN = "UNKNOWN" @dataclass class HealthCheck: name: str check_function: Callable[[], bool] timeout: float = 5.0 importance: str = "critical" # critical, high, medium, low @dataclass class DependencyHealth: name: str status: HealthStatus = HealthStatus.UNKNOWN last_checked: float = 0 details: Dict[str, Any] = field(default_factory=dict) class HealthMonitor: def __init__(self, service_name: str, version: str): self.service_name = service_name self.version = version self.checks: List[HealthCheck] = [] self.dependencies: Dict[str, DependencyHealth] = {} self.startup_time = time.time() self._lock = threading.Lock() def register_check(self, name: str, check_function: Callable[[], bool], timeout: float = 5.0, importance: str = "critical") -> None: """Register a new health check""" check = HealthCheck(name, check_function, timeout, importance) self.checks.append(check) def register_dependency(self, name: str) -> None: """Register a dependency to monitor""" with self._lock: self.dependencies[name] = DependencyHealth(name) def update_dependency(self, name: str, status: HealthStatus, details: Optional[Dict[str, Any]] = None) -> None: """Update the health status of a dependency""" with self._lock: if name not in self.dependencies: self.register_dependency(name) dep = self.dependencies[name] dep.status = status dep.last_checked = time.time() if details: dep.details = details async def run_checks(self) -> Dict[str, Any]: """Run all registered health checks""" results = {} for check in self.checks: try: # Run check with timeout loop = asyncio.get_event_loop() result = await asyncio.wait_for( loop.run_in_executor(None, check.check_function), timeout=check.timeout ) results[check.name] = { "status": HealthStatus.UP.value if result else HealthStatus.DOWN.value, "importance": check.importance } except asyncio.TimeoutError: results[check.name] = { "status": HealthStatus.DOWN.value, "importance": check.importance, "error": "Timeout" } except Exception as e: results[check.name] = { "status": HealthStatus.DOWN.value, "importance": check.importance, "error": str(e) } return results async def get_health(self) -> Dict[str, Any]: """Get complete health information""" check_results = await self.run_checks() # Determine overall status status = HealthStatus.UP # Check for critical failures for name, result in check_results.items(): if (result["importance"] == "critical" and result["status"] == HealthStatus.DOWN.value): status = HealthStatus.DOWN break # Check for high importance degradation if status != HealthStatus.DOWN: for name, result in check_results.items(): if (result["importance"] in ["critical", "high"] and result["status"] != HealthStatus.UP.value): status = HealthStatus.DEGRADED break # Build health response with self._lock: deps = {name: {"status": dep.status.value, "details": dep.details} for name, dep in self.dependencies.items()} return { "status": status.value, "service": self.service_name, "version": self.version, "uptime": time.time() - self.startup_time, "checks": check_results, "dependencies": deps }
To use this with FastAPI:
from fastapi import FastAPI, HTTPException app = FastAPI() health_monitor = HealthMonitor("order-service", "1.0.0") # Register health checks health_monitor.register_check( "database", lambda: db_pool.is_connected(), importance="critical" ) health_monitor.register_check( "disk_space", lambda: check_disk_space() > 100_000_000, # 100MB free importance="high" ) # Update dependency status health_monitor.update_dependency( "payment-service", HealthStatus.UP, {"endpoint": "https://payment.example.com/api"} ) @app.get("/health") async def health(): """Basic health endpoint for load balancers""" health_info = await health_monitor.get_health() if health_info["status"] == HealthStatus.DOWN.value: raise HTTPException(status_code=503, detail="Service Unavailable") return {"status": "UP"} @app.get("/health/details") async def health_details(): """Detailed health information for monitoring systems""" return await health_monitor.get_health()
Sidecar Pattern
The Sidecar pattern deploys helper services alongside main services to handle cross-cutting concerns:
python import os import time import signal import subprocess import threading from typing import List, Dict, Any, Optional import logging import json class SidecarProcess: def __init__(self, name: str, command: List[str], env: Optional[Dict[str, str]] = None): self.name = name self.command = command self.env = env or {} self.process = None self.stopped = False self.logger = logging.getLogger(f"sidecar.{name}") def start(self) -> bool: """Start the sidecar process""" try: # Merge current environment with sidecar-specific env process_ --- ## 101 Books **101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone. Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**! ## Our Creations Be sure to check out our creations: **[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)** --- ### We are on Medium **[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**
Top comments (0)