If you're working with Amazon DocumentDB and want to evaluate how your cluster performs under load, a stress test is one of the best ways to gain confidence in your setup.
In this post, Iβll walk you through a simple, effective way to stress test your DocumentDB cluster using Python and pymongo
.
π¦ Prerequisites
Before running the script, make sure you have:
- An active Amazon DocumentDB cluster
- The global CA certificate from Amazon:
curl -o global-bundle.pem https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem
- Python 3.7+ installed
- The
pymongo
driver:
pip install pymongo
π§ What This Script Does
The script:
- Connects to your DocumentDB cluster
- Spawns multiple threads
- Randomly performs
insert
,read
, orupdate
operations - Measures how long it takes to complete a set of operations
This lets you simulate a concurrent, mixed workload to see how your database performs.
π» The Code
import random import string import time from pymongo import MongoClient from concurrent.futures import ThreadPoolExecutor, as_completed # Configuration MONGO_URI = ( "mongodb://myadmin:********@" "docdb-dima-1.cluster-xxxxxxxxxxxx.us-east-1.docdb.amazonaws.com:27017" "/?ssl=true&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false" ) CA_CERT_PATH = "global-bundle.pem" DB_NAME = "stressTestDB" COLLECTION_NAME = "stressCollection" # Connect client = MongoClient(MONGO_URI, tlsCAFile=CA_CERT_PATH) db = client[DB_NAME] collection = db[COLLECTION_NAME] def random_string(length=10): return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) def stress_worker(thread_id): ops = ["insert", "read", "update"] op = random.choice(ops) try: if op == "insert": doc = {"thread": thread_id, "data": random_string(), "value": random.randint(1, 1000)} collection.insert_one(doc) return f"[Thread {thread_id}] Insert OK" elif op == "read": result = collection.find_one({}, {"_id": 0}) return f"[Thread {thread_id}] Read OK: {result}" elif op == "update": collection.update_one({}, {"$set": {"updated": time.time()}}, upsert=True) return f"[Thread {thread_id}] Update OK" except Exception as e: return f"[Thread {thread_id}] ERROR: {e}" def run_stress_test(concurrency=50, iterations=1000): start = time.time() print(f"Starting stress test: {concurrency} threads x {iterations} ops...") with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = [executor.submit(stress_worker, i % concurrency) for i in range(iterations)] for future in as_completed(futures): print(future.result()) duration = time.time() - start print(f"Test completed in {duration:.2f} seconds") if __name__ == "__main__": run_stress_test(concurrency=20, iterations=200)
π§ͺ Example Output
Starting stress test: 20 threads x 200 ops... [Thread 3] Insert OK [Thread 7] Read OK: {'thread': 9, 'data': 'xyz456', 'value': 88} [Thread 12] Update OK ... Test completed in 11.84 seconds
π Customizing the Test
- Change concurrency by modifying the
concurrency
parameter inrun_stress_test()
. - Adjust workload size via the
iterations
parameter. - Modify workload types (e.g., remove
update
or bias towardread
) by changing theops
list instress_worker()
.
β οΈ Important Notes
- This test writes data β you might want to clean up your test database afterward.
- Make sure youβre not violating AWS quotas or triggering alarms.
- Always test in a non-production environment first.
β Conclusion
Stress testing your DocumentDB cluster helps you prepare for production-scale workloads. This lightweight script is a great starting point for load testing, performance tuning, or monitoring how your system reacts under pressure.
Top comments (0)