In this tutorial, we'll create a scalable task management API using Apache Cassandra as our NoSQL database and FastAPI for the web framework. Cassandra is an excellent choice for applications requiring high availability, horizontal scalability, and consistent performance with large datasets.
Why Apache Cassandra?
While MongoDB and Redis are popular NoSQL choices, Cassandra offers unique benefits:
- Linear scalability with no single point of failure
- Tunable consistency levels
- Optimized for write-heavy workloads
- Excellent support for time-series data
- Built-in partitioning and replication
Prerequisites
- Python 3.8+
- Docker and Docker Compose
- Basic understanding of REST APIs
- Familiarity with async programming
Project Setup
First, let's create our project structure:
task-manager/ ├── docker-compose.yml ├── requirements.txt ├── app/ │ ├── __init__.py │ ├── main.py │ ├── models.py │ ├── database.py │ └── routes/ │ └── tasks.py └── README.md
Let's set up our dependencies in requirements.txt
:
fastapi==0.104.1 uvicorn==0.24.0 cassandra-driver==3.28.0 pydantic==2.4.2 python-dotenv==1.0.0
Create a docker-compose.yml
for Cassandra:
version: '3' services: cassandra: image: cassandra:latest ports: - "9042:9042" environment: - CASSANDRA_CLUSTER_NAME=TaskManagerCluster volumes: - cassandra_data:/var/lib/cassandra volumes: cassandra_data:
Database Setup
Let's create our database connection handler in database.py
:
from cassandra.cluster import Cluster from cassandra.auth import PlainTextAuthProvider from cassandra.cqlengine import connection from contextlib import asynccontextmanager class CassandraConnection: def __init__(self): self.cluster = None self.session = None async def connect(self): self.cluster = Cluster(['localhost']) self.session = self.cluster.connect() # Create keyspace if not exists self.session.execute(""" CREATE KEYSPACE IF NOT EXISTS task_manager WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1} """) self.session.set_keyspace('task_manager') # Create tasks table self.session.execute(""" CREATE TABLE IF NOT EXISTS tasks ( task_id uuid, title text, description text, status text, due_date timestamp, created_at timestamp, updated_at timestamp, PRIMARY KEY (task_id) ) """) return self.session async def disconnect(self): if self.cluster: self.cluster.shutdown() db = CassandraConnection() @asynccontextmanager async def get_db(): try: session = await db.connect() yield session finally: await db.disconnect()
Models
Define our data models in models.py
:
from datetime import datetime from uuid import UUID, uuid4 from pydantic import BaseModel, Field from typing import Optional class TaskBase(BaseModel): title: str description: Optional[str] = None status: str = "pending" due_date: Optional[datetime] = None class TaskCreate(TaskBase): pass class Task(TaskBase): task_id: UUID = Field(default_factory=uuid4) created_at: datetime = Field(default_factory=datetime.now) updated_at: datetime = Field(default_factory=datetime.now) class Config: from_attributes = True
API Routes
Create our task routes in routes/tasks.py
:
from fastapi import APIRouter, Depends, HTTPException from cassandra.cluster import Session from datetime import datetime import uuid from typing import List from ..models import Task, TaskCreate from ..database import get_db router = APIRouter() @router.post("/tasks/", response_model=Task) async def create_task(task: TaskCreate, db: Session = Depends(get_db)): task_id = uuid.uuid4() now = datetime.now() query = """ INSERT INTO tasks (task_id, title, description, status, due_date, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s) """ db.execute(query, ( task_id, task.title, task.description, task.status, task.due_date, now, now )) return Task( task_id=task_id, title=task.title, description=task.description, status=task.status, due_date=task.due_date, created_at=now, updated_at=now ) @router.get("/tasks/{task_id}", response_model=Task) async def read_task(task_id: uuid.UUID, db: Session = Depends(get_db)): query = "SELECT * FROM tasks WHERE task_id = %s" result = db.execute(query, [task_id]).one() if not result: raise HTTPException(status_code=404, detail="Task not found") return Task(**result) @router.get("/tasks/", response_model=List[Task]) async def list_tasks(db: Session = Depends(get_db)): query = "SELECT * FROM tasks" results = db.execute(query) return [Task(**row) for row in results] @router.put("/tasks/{task_id}", response_model=Task) async def update_task(task_id: uuid.UUID, task: TaskCreate, db: Session = Depends(get_db)): # Check if task exists exists_query = "SELECT * FROM tasks WHERE task_id = %s" existing_task = db.execute(exists_query, [task_id]).one() if not existing_task: raise HTTPException(status_code=404, detail="Task not found") update_query = """ UPDATE tasks SET title = %s, description = %s, status = %s, due_date = %s, updated_at = %s WHERE task_id = %s """ now = datetime.now() db.execute(update_query, ( task.title, task.description, task.status, task.due_date, now, task_id )) return Task( task_id=task_id, title=task.title, description=task.description, status=task.status, due_date=task.due_date, created_at=existing_task.created_at, updated_at=now ) @router.delete("/tasks/{task_id}") async def delete_task(task_id: uuid.UUID, db: Session = Depends(get_db)): # Check if task exists exists_query = "SELECT * FROM tasks WHERE task_id = %s" existing_task = db.execute(exists_query, [task_id]).one() if not existing_task: raise HTTPException(status_code=404, detail="Task not found") query = "DELETE FROM tasks WHERE task_id = %s" db.execute(query, [task_id]) return {"message": "Task deleted successfully"}
Main Application
Set up our FastAPI application in main.py
:
from fastapi import FastAPI from .routes import tasks app = FastAPI(title="Task Manager API") app.include_router(tasks.router, tags=["tasks"]) @app.get("/") async def root(): return {"message": "Welcome to Task Manager API"}
Running the Application
- Start Cassandra:
docker-compose up -d
- Install dependencies:
pip install -r requirements.txt
- Run the API:
uvicorn app.main:app --reload
Testing the API
Here are some example curl commands to test our API:
# Create a task curl -X POST "http://localhost:8000/tasks/" \ -H "Content-Type: application/json" \ -d '{"title":"Complete project","description":"Finish the API implementation","status":"pending"}' # List all tasks curl "http://localhost:8000/tasks/" # Get a specific task curl "http://localhost:8000/tasks/{task_id}" # Update a task curl -X PUT "http://localhost:8000/tasks/{task_id}" \ -H "Content-Type: application/json" \ -d '{"title":"Complete project","description":"API implementation done","status":"completed"}' # Delete a task curl -X DELETE "http://localhost:8000/tasks/{task_id}"
Performance Considerations
When working with Cassandra, keep in mind:
- Data Modeling: Design tables around your query patterns
- Partition Keys: Choose partition keys that distribute data evenly
- Consistency Levels: Configure based on your use case
- Batch Operations: Use them sparingly and only for related data
- Secondary Indexes: Use them carefully as they can impact performance
Conclusion
This implementation demonstrates how to build a robust task management API using Apache Cassandra. The combination of Cassandra's scalability and FastAPI's performance makes this stack suitable for production applications requiring high availability and consistency.
Future enhancements could include:
- Authentication and authorization
- More complex querying capabilities
- Batch operations
- Advanced task filtering
- Automated testing
The complete source code is available on GitHub: [Link to your repository]
Top comments (0)