DEV Community

Cover image for How to Process Multi-Gigabyte JSON Files in Python: Memory-Efficient Techniques That Work
Aarav Joshi
Aarav Joshi

Posted on

How to Process Multi-Gigabyte JSON Files in Python: Memory-Efficient Techniques That Work

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Python provides powerful tools for handling large JSON datasets without overwhelming your system resources. I've processed multi-gigabyte JSON files on modest hardware using these techniques, saving both time and computational resources.

Understanding the Challenge

Large JSON datasets present unique challenges. A common mistake is attempting to load entire files with json.load(), which quickly exhausts memory on large datasets. Instead, we need approaches that process data incrementally.

Stream Processing with ijson

The ijson library delivers exceptional performance for parsing large JSON files incrementally. This approach reads only what's needed without loading everything into memory.

import ijson # Process a large JSON file containing an array of objects with open('massive_dataset.json', 'rb') as f: # Extract only objects within the "customers" array  for customer in ijson.items(f, 'customers.item'): # Process each customer individually  name = customer.get('name') email = customer.get('email') process_customer_data(name, email) 
Enter fullscreen mode Exit fullscreen mode

This technique works particularly well for JSON files with predictable structures. I recently used ijson to process a 12GB customer dataset on a laptop with only 8GB RAM - something impossible with standard methods.

Line-by-Line Processing

For newline-delimited JSON (NDJSON) files, where each line contains a complete JSON object, simple line-by-line processing works efficiently:

import json def process_json_lines(filename): with open(filename, 'r') as f: for line in f: if line.strip(): # Skip empty lines  record = json.loads(line) yield record # Usage for item in process_json_lines('large_records.jsonl'): # Process each item with minimal memory overhead  print(item['id']) 
Enter fullscreen mode Exit fullscreen mode

I prefer this method for log processing tasks, where each log entry is a separate JSON object.

Memory-Mapped Files

When you need random access to different parts of a JSON file, memory-mapped files provide excellent performance without loading everything:

import mmap import json import re def find_json_objects(filename, pattern): with open(filename, 'r+b') as f: # Create memory-mapped file  mm = mmap.mmap(f.fileno(), 0) # Search for pattern in the file  pattern_compiled = re.compile(pattern.encode()) # Find all matches  for match in pattern_compiled.finditer(mm): # Extract the JSON object containing the match  start_pos = mm.rfind(b'{', 0, match.start()) end_pos = mm.find(b'}', match.end()) if start_pos != -1 and end_pos != -1: json_bytes = mm[start_pos:end_pos+1] try: yield json.loads(json_bytes) except json.JSONDecodeError: # Handle parsing errors  pass mm.close() # Usage for obj in find_json_objects('analytics_data.json', 'error_code'): log_error(obj) 
Enter fullscreen mode Exit fullscreen mode

This technique saved me countless hours when searching for specific error patterns in large application logs.

Chunked Processing

Breaking down large files into manageable chunks balances memory usage and processing efficiency:

import json def process_in_chunks(filename, chunk_size=1000): chunk = [] with open(filename, 'r') as f: # Assuming JSON file contains an array of objects  f.readline() # Skip the opening '['  for line in f: line = line.strip() if line.endswith(','): line = line[:-1] if line and line != ']': try: item = json.loads(line) chunk.append(item) if len(chunk) >= chunk_size: yield chunk chunk = [] except json.JSONDecodeError: # Handle malformed JSON  continue if chunk: # Don't forget the last chunk  yield chunk # Usage for batch in process_in_chunks('product_catalog.json', 500): db.bulk_insert(batch) 
Enter fullscreen mode Exit fullscreen mode

This pattern works well for database operations, where batch processing is significantly faster than individual inserts.

Compressed JSON Processing

Working directly with compressed files reduces disk I/O and memory usage:

import json import gzip def process_compressed_json(filename): with gzip.open(filename, 'rt', encoding='utf-8') as f: # For a JSON array structure  data = json.load(f) for item in data: yield item # Alternatively, for line-delimited JSON def process_compressed_jsonl(filename): with gzip.open(filename, 'rt', encoding='utf-8') as f: for line in f: if line.strip(): yield json.loads(line) # Usage for record in process_compressed_jsonl('logs.jsonl.gz'): analyze_log_entry(record) 
Enter fullscreen mode Exit fullscreen mode

I routinely compress our historical datasets to 10-20% of their original size while maintaining fast access.

JSON Path Extraction

For targeted data extraction, JSON Path expressions provide precise selection:

import json from jsonpath_ng import parse def extract_with_jsonpath(filename, json_path): with open(filename, 'r') as f: data = json.load(f) # Compile the JSONPath expression  jsonpath_expr = parse(json_path) # Find all matches  return [match.value for match in jsonpath_expr.find(data)] # Usage - extract all prices from a product catalog prices = extract_with_jsonpath('catalog.json', '$..price') 
Enter fullscreen mode Exit fullscreen mode

For larger files, combine this with chunked processing:

def extract_with_jsonpath_chunked(filename, json_path, chunk_size=100): jsonpath_expr = parse(json_path) for chunk in process_in_chunks(filename, chunk_size): for item in chunk: for match in jsonpath_expr.find(item): yield match.value 
Enter fullscreen mode Exit fullscreen mode

This approach works best when you need specific fields from a complex JSON structure.

Parallel Processing

For multi-core machines, parallel processing delivers significant speed improvements:

import json from concurrent.futures import ProcessPoolExecutor import os def process_partition(filename, start_pos, end_pos): results = [] with open(filename, 'r') as f: f.seek(start_pos) # Read to the first complete line if not at file start  if start_pos != 0: f.readline() line = f.readline() while line and f.tell() <= end_pos: try: record = json.loads(line) # Process the record  results.append(transform_record(record)) except json.JSONDecodeError: pass line = f.readline() return results def parallel_process_json(filename, num_workers=None): if num_workers is None: num_workers = os.cpu_count() # Get file size  file_size = os.path.getsize(filename) # Calculate partition sizes  chunk_size = file_size // num_workers # Create tasks  tasks = [] for i in range(num_workers): start = i * chunk_size end = (i + 1) * chunk_size if i < num_workers - 1 else file_size tasks.append((filename, start, end)) # Process in parallel  all_results = [] with ProcessPoolExecutor(max_workers=num_workers) as executor: for result in executor.map(lambda p: process_partition(*p), tasks): all_results.extend(result) return all_results 
Enter fullscreen mode Exit fullscreen mode

On my 8-core processor, this approach processes files nearly 6 times faster than sequential methods.

Combining Techniques for Maximum Efficiency

For truly massive datasets, I often combine multiple techniques:

import ijson import gzip from concurrent.futures import ThreadPoolExecutor def process_compressed_stream(filename, batch_size=1000): batch = [] with gzip.open(filename, 'rb') as f: # Stream-parse the JSON data  parser = ijson.items(f, 'item') for item in parser: batch.append(item) if len(batch) >= batch_size: yield batch batch = [] if batch: # Don't forget the last batch  yield batch def process_batch(batch): # Process a batch of records  results = [] for item in batch: # Do some transformation  transformed = transform_data(item) results.append(transformed) # Bulk save to database  save_to_database(results) return len(results) def main(): filename = 'massive_dataset.json.gz' total_processed = 0 # Create a thread pool  with ThreadPoolExecutor(max_workers=4) as executor: futures = [] # Submit batch processing tasks  for batch in process_compressed_stream(filename): future = executor.submit(process_batch, batch) futures.append(future) # Collect results  for future in futures: total_processed += future.result() print(f"Processed {total_processed} records") if __name__ == "__main__": main() 
Enter fullscreen mode Exit fullscreen mode

This implementation streams from a compressed file while processing batches in parallel threads.

Transforming Data Efficiently

When transforming large datasets, generator functions maintain memory efficiency:

def transform_stream(data_stream): for item in data_stream: # Apply transformations  if 'name' in item: item['name'] = item['name'].upper() if 'timestamp' in item: item['date'] = convert_timestamp_to_date(item['timestamp']) yield item # Usage with our previous function for batch in transform_stream(process_json_lines('data.jsonl')): write_to_output(batch) 
Enter fullscreen mode Exit fullscreen mode

This approach allows transforming unlimited amounts of data with minimal memory usage.

Real-world Application

In a recent project, I needed to analyze several years of user interaction data (over 50GB). By combining streaming, batching, and parallel processing, the task completed in hours rather than days:

def analyze_user_interactions(): # Process multiple large files  file_list = glob.glob('user_data_*.json.gz') total_interactions = 0 user_stats = {} for filename in file_list: print(f"Processing {filename}") # Process file with our stream processor  for batch in process_compressed_stream(filename): # Update statistics  for interaction in batch: user_id = interaction.get('user_id') action = interaction.get('action') if user_id and action: if user_id not in user_stats: user_stats[user_id] = {'actions': {}} if action not in user_stats[user_id]['actions']: user_stats[user_id]['actions'][action] = 0 user_stats[user_id]['actions'][action] += 1 total_interactions += 1 print(f"Analyzed {total_interactions} interactions across {len(user_stats)} users") return user_stats 
Enter fullscreen mode Exit fullscreen mode

The key to success with large JSON datasets is processing the data incrementally, keeping memory usage low, and leveraging parallel processing where possible. With these techniques, you can handle virtually any size of JSON data, even on modest hardware.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)