DEV Community

Cover image for 8 Python Design Patterns for Resilient Distributed Systems and Microservices
Aarav Joshi
Aarav Joshi

Posted on

8 Python Design Patterns for Resilient Distributed Systems and Microservices

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Python design patterns provide essential solutions for building resilient distributed systems and microservices. As a Python architect, I've implemented these patterns across numerous projects and found them invaluable for creating maintainable, scalable systems. Let me share eight critical patterns that will transform your distributed architecture.

Circuit Breaker Pattern

The Circuit Breaker pattern prevents cascading failures in distributed systems. When a service becomes unavailable, the circuit breaker stops sending requests, allowing the failing component time to recover.

I've implemented this pattern numerous times, particularly when integrating with third-party APIs that occasionally experience downtime:

import time import functools from enum import Enum class CircuitState(Enum): CLOSED = 'closed' OPEN = 'open' HALF_OPEN = 'half_open' class CircuitBreaker: def __init__(self, failure_threshold=5, recovery_timeout=30, name="default"): self.name = name self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None def __call__(self, func): @functools.wraps(func) def wrapper(*args, **kwargs): if self.state == CircuitState.OPEN: if time.time() - self.last_failure_time > self.recovery_timeout: self.state = CircuitState.HALF_OPEN print(f"Circuit {self.name} switching to HALF_OPEN") else: print(f"Circuit {self.name} is OPEN - fast failing") raise CircuitBreakerError(f"Circuit {self.name} is open") try: result = func(*args, **kwargs) # Success - reset if in half-open state  if self.state == CircuitState.HALF_OPEN: self.reset() print(f"Circuit {self.name} reset to CLOSED after success") return result except Exception as e: # Handle failures  self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.CLOSED and self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN print(f"Circuit {self.name} tripped to OPEN after {self.failure_count} failures") raise e return wrapper def reset(self): self.failure_count = 0 self.state = CircuitState.CLOSED self.last_failure_time = None class CircuitBreakerError(Exception): pass 
Enter fullscreen mode Exit fullscreen mode

This implementation includes state transitions (CLOSED, OPEN, HALF-OPEN) and automatic recovery attempts. I use this as a decorator for service functions:

@CircuitBreaker(failure_threshold=3, recovery_timeout=10, name="payment_service") def process_payment(payment_data): # External API call that might fail  response = requests.post("https://payment.example.com/process", json=payment_data) response.raise_for_status() return response.json() 
Enter fullscreen mode Exit fullscreen mode

Saga Pattern

The Saga pattern helps maintain data consistency across microservices without distributed transactions. Each step in a business process has a corresponding compensation action to undo changes if a step fails.

Here's my implementation of a saga coordinator:

import uuid from enum import Enum from typing import Dict, List, Callable, Any class SagaStatus(Enum): STARTED = "started" COMPLETED = "completed" FAILED = "failed" class SagaStep: def __init__(self, action, compensation): self.action = action self.compensation = compensation self.executed = False class Saga: def __init__(self, name: str): self.name = name self.saga_id = str(uuid.uuid4()) self.steps: List[SagaStep] = [] self.context: Dict[str, Any] = {} self.status = SagaStatus.STARTED def add_step(self, action: Callable, compensation: Callable): """Add a step to the saga with its compensation action""" self.steps.append(SagaStep(action, compensation)) return self def execute(self): """Execute the saga with compensation on failure""" executed_steps = [] try: # Execute each step in sequence  for step in self.steps: step.action(self.context) step.executed = True executed_steps.append(step) self.status = SagaStatus.COMPLETED return self.context except Exception as e: self.status = SagaStatus.FAILED # Compensate in reverse order  for step in reversed(executed_steps): try: step.compensation(self.context) except Exception as comp_error: # Log compensation error but continue with other compensations  print(f"Compensation error in {self.name}: {comp_error}") raise SagaExecutionError(f"Saga {self.name} failed: {str(e)}") class SagaExecutionError(Exception): pass 
Enter fullscreen mode Exit fullscreen mode

I've used this pattern for order processing workflows:

def create_order(context): # Create order in order service  order_id = order_service.create_order(context["customer_id"], context["items"]) context["order_id"] = order_id def delete_order(context): # Compensation: delete the order  order_service.delete_order(context["order_id"]) def reserve_inventory(context): # Reserve inventory items  inventory_service.reserve_items(context["items"]) def release_inventory(context): # Compensation: release reserved inventory  inventory_service.release_items(context["items"]) def process_payment(context): # Process payment  payment_id = payment_service.charge(context["customer_id"], context["total"]) context["payment_id"] = payment_id def refund_payment(context): # Compensation: refund payment  payment_service.refund(context["payment_id"]) # Create and execute the saga order_saga = Saga("order_processing") order_saga.add_step(create_order, delete_order) order_saga.add_step(reserve_inventory, release_inventory) order_saga.add_step(process_payment, refund_payment) try: result = order_saga.execute() print(f"Order completed successfully: {result}") except SagaExecutionError as e: print(f"Order failed with compensation: {e}") 
Enter fullscreen mode Exit fullscreen mode

Bulkhead Pattern

The Bulkhead pattern isolates components to prevent failures from cascading through the system. I implement this using thread pools and resource limits:

import concurrent.futures from dataclasses import dataclass @dataclass class Bulkhead: name: str max_concurrent_calls: int max_queue_size: int def __post_init__(self): self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=self.max_concurrent_calls, thread_name_prefix=f"bulkhead-{self.name}" ) self._semaphore = concurrent.futures.Semaphore( self.max_concurrent_calls + self.max_queue_size ) def execute(self, fn, *args, **kwargs): if not self._semaphore.acquire(blocking=False): raise BulkheadFullError(f"Bulkhead {self.name} is full") try: future = self._executor.submit(fn, *args, **kwargs) future.add_done_callback(lambda _: self._semaphore.release()) return future except Exception: self._semaphore.release() raise def shutdown(self, wait=True): self._executor.shutdown(wait=wait) class BulkheadFullError(Exception): pass 
Enter fullscreen mode Exit fullscreen mode

I apply this pattern to isolate critical services:

# Create bulkheads for different services database_bulkhead = Bulkhead("database", max_concurrent_calls=10, max_queue_size=20) payment_bulkhead = Bulkhead("payment", max_concurrent_calls=5, max_queue_size=10) email_bulkhead = Bulkhead("email", max_concurrent_calls=20, max_queue_size=50) # Use the bulkheads try: # Database operations with limited concurrency  future = database_bulkhead.execute(db_service.query, "SELECT * FROM users") result = future.result(timeout=2.0) # Set timeout to avoid waiting forever  # Payment processing with isolation  payment_future = payment_bulkhead.execute( payment_service.process_payment, user_id=123, amount=99.99 ) payment_result = payment_future.result(timeout=5.0) except BulkheadFullError as e: # Handle overload - perhaps retry later or return degraded response  print(f"Service overloaded: {e}") except concurrent.futures.TimeoutError: # Handle timeout  print("Operation timed out") 
Enter fullscreen mode Exit fullscreen mode

CQRS Pattern

The Command Query Responsibility Segregation pattern separates read and write operations. This allows for optimizing each pathway independently.

Here's a simplified implementation:

from abc import ABC, abstractmethod from typing import Dict, List, Any # Command side class Command(ABC): pass class CommandHandler(ABC): @abstractmethod def handle(self, command: Command) -> None: pass class CreateOrderCommand(Command): def __init__(self, order_id: str, customer_id: str, items: List[Dict]): self.order_id = order_id self.customer_id = customer_id self.items = items class CreateOrderHandler(CommandHandler): def __init__(self, repository): self.repository = repository def handle(self, command: CreateOrderCommand) -> None: # Business logic for creating an order  order = { 'order_id': command.order_id, 'customer_id': command.customer_id, 'items': command.items, 'status': 'created' } # Store in write model  self.repository.save(order) # Publish event for read model sync  self.repository.publish_event('order_created', order) # Query side class Query(ABC): pass class QueryHandler(ABC): @abstractmethod def handle(self, query: Query) -> Any: pass class GetOrderQuery(Query): def __init__(self, order_id: str): self.order_id = order_id class GetOrderHandler(QueryHandler): def __init__(self, read_repository): self.read_repository = read_repository def handle(self, query: GetOrderQuery) -> Dict: # Fetch from optimized read model  return self.read_repository.get_order(query.order_id) # Command bus class CommandBus: def __init__(self): self.handlers = {} def register(self, command_type, handler): self.handlers[command_type] = handler def dispatch(self, command): handler = self.handlers.get(type(command)) if not handler: raise ValueError(f"No handler registered for {type(command)}") return handler.handle(command) # Query bus class QueryBus: def __init__(self): self.handlers = {} def register(self, query_type, handler): self.handlers[query_type] = handler def dispatch(self, query): handler = self.handlers.get(type(query)) if not handler: raise ValueError(f"No handler registered for {type(query)}") return handler.handle(query) 
Enter fullscreen mode Exit fullscreen mode

In my projects, I use CQRS when reading and writing needs differ significantly:

# Setup command_bus = CommandBus() query_bus = QueryBus() # Register command handlers command_bus.register(CreateOrderCommand, CreateOrderHandler(write_repository)) # Register query handlers query_bus.register(GetOrderQuery, GetOrderHandler(read_repository)) # Client code - Command side def create_order_endpoint(data): command = CreateOrderCommand( order_id=str(uuid.uuid4()), customer_id=data['customer_id'], items=data['items'] ) command_bus.dispatch(command) return {"status": "order created", "order_id": command.order_id} # Client code - Query side def get_order_endpoint(order_id): query = GetOrderQuery(order_id) result = query_bus.dispatch(query) return result 
Enter fullscreen mode Exit fullscreen mode

API Gateway Pattern

The API Gateway pattern centralizes cross-cutting concerns and provides a single entry point to microservices. I've implemented lightweight gateways using FastAPI:

from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.security import OAuth2PasswordBearer import httpx import jwt import time app = FastAPI(title="API Gateway") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Service registry (could be dynamic with service discovery) SERVICES = { "users": "http://user-service:8001", "orders": "http://order-service:8002", "products": "http://product-service:8003", } JWT_SECRET = "your-secret-key" # In production, use secure environment variables  # Authentication middleware async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=401, detail="Invalid token") return {"user_id": user_id} except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid token") # Rate limiting middleware class RateLimiter: def __init__(self, requests_per_minute=60): self.requests_per_minute = requests_per_minute self.requests = {} async def __call__(self, request: Request): client_ip = request.client.host current_time = time.time() # Clean old entries  self.requests = {ip: times for ip, times in self.requests.items() if current_time - times[-1] < 60} # Get client's request times  request_times = self.requests.get(client_ip, []) # Remove requests older than 1 minute  request_times = [t for t in request_times if current_time - t < 60] if len(request_times) >= self.requests_per_minute: raise HTTPException(status_code=429, detail="Rate limit exceeded") # Add current request  request_times.append(current_time) self.requests[client_ip] = request_times rate_limiter = RateLimiter(requests_per_minute=100) # Service proxy with timeout async def proxy_to_service(service: str, path: str, request: Request): if service not in SERVICES: raise HTTPException(status_code=404, detail=f"Service {service} not found") # Get target URL  target_url = f"{SERVICES[service]}{path}" # Forward method, headers, and body  method = request.method headers = dict(request.headers) headers.pop("host", None) # Remove host header  body = await request.body() # Use httpx for async HTTP requests  async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.request( method=method, url=target_url, headers=headers, content=body, ) return { "status_code": response.status_code, "content": response.json() if response.headers.get("content-type") == "application/json" else response.text, "headers": dict(response.headers) } except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Service timeout") except Exception as e: raise HTTPException(status_code=502, detail=f"Error: {str(e)}") # API Gateway routes @app.get("/") async def root(): return {"message": "API Gateway"} @app.get("/health") async def health(): return {"status": "healthy", "timestamp": time.time()} # Public routes (no auth) @app.get("/public/{service}{path:path}") async def public_route(service: str, path: str, request: Request): await rate_limiter(request) return await proxy_to_service(service, path, request) # Protected routes (with auth) @app.get("/api/{service}{path:path}") async def protected_route( service: str, path: str, request: Request, user: dict = Depends(get_current_user) ): await rate_limiter(request) return await proxy_to_service(service, path, request) 
Enter fullscreen mode Exit fullscreen mode

Event Sourcing Pattern

The Event Sourcing pattern stores state changes as a sequence of events. This enables powerful event-driven communication between services.

Here's my implementation:

import json import time import uuid from typing import Dict, List, Any, Optional, Callable class Event: def __init__(self, event_type: str, data: Dict[str, Any], aggregate_id: str, version: int): self.id = str(uuid.uuid4()) self.event_type = event_type self.data = data self.aggregate_id = aggregate_id self.version = version self.timestamp = time.time() def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "event_type": self.event_type, "data": self.data, "aggregate_id": self.aggregate_id, "version": self.version, "timestamp": self.timestamp } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Event": event = cls( event_type=data["event_type"], data=data["data"], aggregate_id=data["aggregate_id"], version=data["version"] ) event.id = data["id"] event.timestamp = data["timestamp"] return event class EventStore: def __init__(self): self.events: Dict[str, List[Event]] = {} self.subscribers: Dict[str, List[Callable]] = {} def save_event(self, event: Event) -> None: """Save an event to the store and publish it to subscribers""" if event.aggregate_id not in self.events: self.events[event.aggregate_id] = [] # Check for version conflicts  events = self.events[event.aggregate_id] if events and events[-1].version >= event.version: raise ValueError(f"Version conflict for {event.aggregate_id}") # Store the event  self.events[event.aggregate_id].append(event) # Publish the event  self._publish(event) def get_events(self, aggregate_id: str, since_version: int = 0) -> List[Event]: """Get all events for an aggregate ID since a specific version""" events = self.events.get(aggregate_id, []) return [e for e in events if e.version > since_version] def subscribe(self, event_type: str, callback: Callable[[Event], None]) -> None: """Subscribe to events of a specific type""" if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(callback) def _publish(self, event: Event) -> None: """Publish event to subscribers""" subscribers = self.subscribers.get(event.event_type, []) for subscriber in subscribers: try: subscriber(event) except Exception as e: # In production, log this error but don't stop processing  print(f"Error in subscriber: {e}") class Aggregate: """Base class for aggregates that use event sourcing""" def __init__(self, id: str, event_store: EventStore): self.id = id self.version = 0 self._event_store = event_store self._changes: List[Event] = [] def load_from_history(self) -> None: """Load the aggregate state from its event history""" events = self._event_store.get_events(self.id) for event in events: self._apply_event(event) self.version = event.version def apply_event(self, event_type: str, data: Dict[str, Any]) -> None: """Apply a new event to the aggregate""" event = Event( event_type=event_type, data=data, aggregate_id=self.id, version=self.version + 1 ) # Apply event to update current state  self._apply_event(event) # Track the change to be committed  self._changes.append(event) self.version += 1 def commit(self) -> None: """Commit all pending changes to the event store""" for event in self._changes: self._event_store.save_event(event) self._changes = [] def _apply_event(self, event: Event) -> None: """Apply an event to the aggregate - override in subclasses""" method_name = f"apply_{event.event_type}" method = getattr(self, method_name, None) if method: method(event.data) 
Enter fullscreen mode Exit fullscreen mode

Here's an order management example using event sourcing:

class Order(Aggregate): def __init__(self, id: str, event_store: EventStore): super().__init__(id, event_store) self.status = "new" self.items = [] self.customer_id = None self.total = 0 def create(self, customer_id: str) -> None: self.apply_event("order_created", { "customer_id": customer_id }) def add_item(self, product_id: str, quantity: int, price: float) -> None: if self.status != "new": raise ValueError("Cannot add items to non-new order") self.apply_event("item_added", { "product_id": product_id, "quantity": quantity, "price": price }) def submit(self) -> None: if not self.items: raise ValueError("Cannot submit empty order") self.apply_event("order_submitted", {}) def apply_order_created(self, data: Dict[str, Any]) -> None: self.customer_id = data["customer_id"] self.status = "new" def apply_item_added(self, data: Dict[str, Any]) -> None: item = { "product_id": data["product_id"], "quantity": data["quantity"], "price": data["price"] } self.items.append(item) self.total += data["quantity"] * data["price"] def apply_order_submitted(self, data: Dict[str, Any]) -> None: self.status = "submitted" # Using the event sourcing system event_store = EventStore() # Create a new order order = Order("order-123", event_store) order.create("customer-456") order.add_item("product-789", 2, 10.99) order.add_item("product-101", 1, 24.99) order.submit() order.commit() # Later, reconstruct the order state loaded_order = Order("order-123", event_store) loaded_order.load_from_history() print(f"Order total: ${loaded_order.total}") print(f"Order status: {loaded_order.status}") print(f"Items count: {len(loaded_order.items)}") 
Enter fullscreen mode Exit fullscreen mode

Health Check Pattern

The Health Check pattern implements monitoring endpoints to report service status:

import time import asyncio import threading from enum import Enum from typing import Dict, List, Callable, Any, Optional from dataclasses import dataclass, field class HealthStatus(Enum): UP = "UP" DOWN = "DOWN" DEGRADED = "DEGRADED" UNKNOWN = "UNKNOWN" @dataclass class HealthCheck: name: str check_function: Callable[[], bool] timeout: float = 5.0 importance: str = "critical" # critical, high, medium, low  @dataclass class DependencyHealth: name: str status: HealthStatus = HealthStatus.UNKNOWN last_checked: float = 0 details: Dict[str, Any] = field(default_factory=dict) class HealthMonitor: def __init__(self, service_name: str, version: str): self.service_name = service_name self.version = version self.checks: List[HealthCheck] = [] self.dependencies: Dict[str, DependencyHealth] = {} self.startup_time = time.time() self._lock = threading.Lock() def register_check(self, name: str, check_function: Callable[[], bool], timeout: float = 5.0, importance: str = "critical") -> None: """Register a new health check""" check = HealthCheck(name, check_function, timeout, importance) self.checks.append(check) def register_dependency(self, name: str) -> None: """Register a dependency to monitor""" with self._lock: self.dependencies[name] = DependencyHealth(name) def update_dependency(self, name: str, status: HealthStatus, details: Optional[Dict[str, Any]] = None) -> None: """Update the health status of a dependency""" with self._lock: if name not in self.dependencies: self.register_dependency(name) dep = self.dependencies[name] dep.status = status dep.last_checked = time.time() if details: dep.details = details async def run_checks(self) -> Dict[str, Any]: """Run all registered health checks""" results = {} for check in self.checks: try: # Run check with timeout  loop = asyncio.get_event_loop() result = await asyncio.wait_for( loop.run_in_executor(None, check.check_function), timeout=check.timeout ) results[check.name] = { "status": HealthStatus.UP.value if result else HealthStatus.DOWN.value, "importance": check.importance } except asyncio.TimeoutError: results[check.name] = { "status": HealthStatus.DOWN.value, "importance": check.importance, "error": "Timeout" } except Exception as e: results[check.name] = { "status": HealthStatus.DOWN.value, "importance": check.importance, "error": str(e) } return results async def get_health(self) -> Dict[str, Any]: """Get complete health information""" check_results = await self.run_checks() # Determine overall status  status = HealthStatus.UP # Check for critical failures  for name, result in check_results.items(): if (result["importance"] == "critical" and result["status"] == HealthStatus.DOWN.value): status = HealthStatus.DOWN break # Check for high importance degradation  if status != HealthStatus.DOWN: for name, result in check_results.items(): if (result["importance"] in ["critical", "high"] and result["status"] != HealthStatus.UP.value): status = HealthStatus.DEGRADED break # Build health response  with self._lock: deps = {name: {"status": dep.status.value, "details": dep.details} for name, dep in self.dependencies.items()} return { "status": status.value, "service": self.service_name, "version": self.version, "uptime": time.time() - self.startup_time, "checks": check_results, "dependencies": deps } 
Enter fullscreen mode Exit fullscreen mode

To use this with FastAPI:

from fastapi import FastAPI, HTTPException app = FastAPI() health_monitor = HealthMonitor("order-service", "1.0.0") # Register health checks health_monitor.register_check( "database", lambda: db_pool.is_connected(), importance="critical" ) health_monitor.register_check( "disk_space", lambda: check_disk_space() > 100_000_000, # 100MB free  importance="high" ) # Update dependency status health_monitor.update_dependency( "payment-service", HealthStatus.UP, {"endpoint": "https://payment.example.com/api"} ) @app.get("/health") async def health(): """Basic health endpoint for load balancers""" health_info = await health_monitor.get_health() if health_info["status"] == HealthStatus.DOWN.value: raise HTTPException(status_code=503, detail="Service Unavailable") return {"status": "UP"} @app.get("/health/details") async def health_details(): """Detailed health information for monitoring systems""" return await health_monitor.get_health() 
Enter fullscreen mode Exit fullscreen mode

Sidecar Pattern

The Sidecar pattern deploys helper services alongside main services to handle cross-cutting concerns:

 python import os import time import signal import subprocess import threading from typing import List, Dict, Any, Optional import logging import json class SidecarProcess: def __init__(self, name: str, command: List[str], env: Optional[Dict[str, str]] = None): self.name = name self.command = command self.env = env or {} self.process = None self.stopped = False self.logger = logging.getLogger(f"sidecar.{name}") def start(self) -> bool: """Start the sidecar process""" try: # Merge current environment with sidecar-specific env process_ --- ## 101 Books **101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone. Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**! ## Our Creations Be sure to check out our creations: **[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)** --- ### We are on Medium **[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)** 
Enter fullscreen mode Exit fullscreen mode

Top comments (0)