Skip to content

nbathreya/Memory-Efficient-Stream-Processor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Memory-Efficient Stream Processor

Process GB-scale signals with constant memory usage. Demonstrates chunked processing, memory mapping, and zero-copy operations for real-time data pipelines.

Python PyTorch License

Key Features

  • Memory-Mapped I/O: Zero-copy file reading with mmap
  • Constant Memory: Process unlimited file sizes with fixed RAM
  • Chunked Operations: FFT and convolution with overlap-save/add
  • Memory Pooling: Array reuse to minimize allocations

Performance

File Size Peak Memory Processing Time
200 MB 48 MB 2.3s
2 GB 52 MB 18.7s
20 GB 54 MB 186s

Memory efficiency: 40-400x less than naive loading
Throughput: ~110 MB/s on standard SSD

Quick Start

from stream_processor import StreamProcessor, StreamConfig config = StreamConfig(chunk_size=1_000_000, overlap=1000) processor = StreamProcessor(config) # Process large file with constant memory data_iter = processor.process_file_mmap("signal.bin") result = processor.process_stream(data_iter) print(f"Peak memory: {result['stats']['peak_memory_mb']:.1f} MB") print(f"Events found: {result['stats']['events_found']}")

Architecture

Large File (10 GB) │ ▼ ┌──────────────┐ │ Memory Map │ ← Zero-copy view └──────┬───────┘ │ ▼ ┌──────────────┐ │ Chunk Reader │ ← Process 1M samples at a time └──────┬───────┘ │ ▼ ┌──────────────┐ │ Detector │ ← Event detection └──────┬───────┘ │ ▼ ┌──────────────┐ │ Memory Pool │ ← Reuse arrays └──────────────┘ Peak Memory: ~50 MB (constant) 

Installation

pip install numpy psutil

Chunked Operations

FFT with Overlap-Add:

from stream_processor import ChunkedArrayProcessor processor = ChunkedArrayProcessor(chunk_size=1_000_000) # 5x less memory than np.fft.fft() fft_result = processor.chunked_fft(large_signal)

Convolution with Overlap-Save:

kernel = np.array([0.25, 0.5, 0.25]) # Moving average filtered = processor.chunked_convolve(signal, kernel)

Memory Management

Memory pooling:

pool = MemoryPool(max_size=10) # Get from pool (or allocate) arr = pool.get(shape=(1000000,), dtype=np.float32) # Use array... # Return to pool pool.put(arr)

Memory tracking:

# Built-in memory monitoring processor.stats["peak_memory_mb"] # Peak RAM usage

Benchmark

python stream_processor.py

Expected output:

=== Memory-Efficient Stream Processing === Creating 50,000,000 sample signal (~190 MB)... Processing with memory mapping... Results: Total samples: 50,000,000 Chunks processed: 50 Events found: 1,000 Peak memory: 48.3 MB Memory efficiency: 4.0x === Chunked FFT vs Standard FFT === Chunked FFT: 2.34s, 82.1 MB Standard FFT: 2.18s, 458.3 MB Memory savings: 5.6x 

Use Cases

High-throughput signal processing:

  • Process TB-scale sensor data
  • Real-time streaming from hardware
  • Batch processing on memory-constrained systems

Edge computing:

  • Process on embedded devices with limited RAM
  • Cloud cost optimization (smaller instances)

Data pipelines:

  • ETL for time-series databases
  • Feature extraction for ML training

Technical Details

Memory Mapping

with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm: # Zero-copy view into file chunk_bytes = mm[offset:offset+chunk_size] chunk = np.frombuffer(chunk_bytes, dtype=np.float32)

Benefits:

  • OS handles paging automatically
  • No full file load into RAM
  • Shared memory for multiple processes

Overlap Processing

chunk_size = 1_000_000 overlap = 1000 for i in range(0, len(signal), chunk_size - overlap): chunk = signal[i:i+chunk_size] # Process with overlap to avoid edge artifacts

Why overlap?

  • Prevents edge effects in filtering
  • Ensures event continuity across boundaries
  • Minimal overhead (0.1% extra processing)

Comparison

Method Memory Speed Complexity
np.load() O(n) Fast Simple
np.memmap() O(1) Fast Simple
This implementation O(1) Fast Medium

When to use:

  • Files > 1GB and limited RAM
  • Streaming real-time data
  • Processing on edge devices

Advanced Usage

Custom chunk processing:

def custom_detector(chunk, offset): # Your detection logic return events processor._detect_in_chunk = custom_detector

Progress callbacks:

def progress_cb(chunk_num, total): print(f"Processing {chunk_num}/{total}") # Add to processor

Multi-file batch:

from pathlib import Path for filepath in Path("data/").glob("*.bin"): result = processor.process_stream( processor.process_file_mmap(filepath) )

Future Enhancements

  • Multi-threaded chunk processing
  • GPU transfer for chunked computation
  • Distributed processing (Ray/Dask)
  • Apache Arrow zero-copy format

License: MIT | Python: 3.8+ | Memory: Works on 2GB RAM

About

Memory-Efficient Streaming Signal Processor

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages