The Parallel Indexing Pipeline is the core indexing engine that transforms raw Nginx log files into searchable Bleve indexes. It implements high-performance parallel document processing using worker pools, SIMD-optimized parsing, adaptive batch sizing, and checkpoint-based recovery. This page focuses on the ParallelIndexer architecture, its job processing pipeline, shard distribution strategy, and incremental indexing capabilities.
For log file discovery and metadata management, see Log Discovery and File Management. For search query execution, see Search and Query System.
Sources: internal/nginx_log/indexer/parallel_indexer.go1-1500 internal/nginx_log/indexer/types.go1-513 internal/nginx_log/indexer/parser.go1-279
The ParallelIndexer orchestrates high-throughput log indexing through a multi-stage pipeline with concurrent workers, intelligent batching, and shard-aware document routing.
Diagram: ParallelIndexer Component Architecture
Key Fields:
| Field | Type | Purpose | 
|---|---|---|
workers | []*indexWorker | Pool of worker goroutines processing jobs | 
jobQueue | chan *IndexJob | Buffered channel for incoming indexing jobs | 
resultQueue | chan *IndexResult | Buffered channel for job results | 
shardManager | ShardManager | Routes documents to appropriate shards | 
adaptiveOptimizer | *AdaptiveOptimizer | Dynamic worker and batch size tuning | 
zeroAllocProcessor | *ZeroAllocBatchProcessor | Object pooling for memory efficiency | 
rotationScanner | *RotationScanner | Optimized scanning of rotated log files | 
Sources: internal/nginx_log/indexer/parallel_indexer.go18-52 internal/nginx_log/indexer/types.go47-92
Sources: internal/nginx_log/indexer/parallel_indexer.go62-121 internal/nginx_log/indexer/parallel_indexer.go124-174
The indexing pipeline processes documents through multiple stages with concurrent workers and intelligent batching.
IndexJob Structure:
Job Submission Methods:
| Method | Behavior | Use Case | 
|---|---|---|
IndexDocument(ctx, doc) | Synchronous, blocks until complete | Single document, immediate feedback | 
IndexDocuments(ctx, docs) | Synchronous, blocks until complete | Batch, wait for completion | 
IndexDocumentAsync(doc, callback) | Asynchronous, returns immediately | Non-blocking, callback on completion | 
IndexDocumentsAsync(docs, callback) | Asynchronous, returns immediately | Batch non-blocking | 
Sources: internal/nginx_log/indexer/parallel_indexer.go308-389 internal/nginx_log/indexer/types.go205-210
Each indexWorker runs in its own goroutine, continuously processing jobs from the queue:
Sources: internal/nginx_log/indexer/parallel_indexer.go940-977 internal/nginx_log/indexer/parallel_indexer.go979-1039
The GroupedShardManager implements content-aware document routing to optimize query performance by co-locating related documents.
Diagram: GroupedShardManager.GetShardForDocument() Flow
GroupedShardManager.GetShardForDocument() Implementation:
From internal/nginx_log/indexer/grouped_shard_manager.go150-170:
Input: mainLogPath string, documentID string Process: 1. Validate mainLogPath is non-empty (required for grouped sharding) 2. Create FNV-1a 32-bit hasher 3. Compute hash = fnv1a.New32().Write([]byte(mainLogPath)).Sum32() 4. Compute shardID = hash % uint32(len(shards)) 5. Retrieve bleve.Index from gsm.shards[shardID] 6. Return (index, int(shardID), nil) Benefits: - All documents from same log group (main_log_path) in same shard - Enables single-shard queries for log group filters - Balanced distribution via hash function - Deterministic routing (same path → same shard)  Benefits:
| Benefit | Explanation | 
|---|---|
| Query Locality | All documents from same log group in one shard → single-shard queries | 
| Balanced Distribution | Hash-based routing distributes groups evenly across shards | 
| Stable Routing | Same main_log_path always routes to same shard | 
| Rebuild Safety | Deterministic routing ensures consistent shard placement | 
Sources: internal/nginx_log/indexer/parallel_indexer.go986-1004 internal/nginx_log/indexer/types.go296-311
Workers batch documents before writing to Bleve for efficiency:
Sources: internal/nginx_log/indexer/parallel_indexer.go1007-1039
The parsing subsystem uses vectorized instructions (SIMD) to achieve 7-8x faster log line processing with 70% memory reduction.
Diagram: Global Parser Singleton and API
Parser Initialization:
Sources: internal/nginx_log/indexer/parser.go23-52 internal/nginx_log/indexer/parser.go60-76
Performance Characteristics:
| Metric | Standard Parser | SIMD Parser | Improvement | 
|---|---|---|---|
| Throughput | ~50,000 lines/sec | ~400,000 lines/sec | 8x faster | 
| Memory per line | ~2 KB | ~600 bytes | 70% reduction | 
| Allocations | ~50 per line | ~3 per line | 94% reduction | 
| Cache hit rate | N/A | 85% (UA parsing) | N/A | 
Sources: internal/nginx_log/indexer/parser.go79-110 internal/nginx_log/indexer/parser.go172-231
Object Pools (utils package):
LogStringBuilderPool - Reusable string builders for ID generationBufferPool - Byte buffers for I/O operationsDocumentPool - Reusable document structuresSources: internal/nginx_log/indexer/parser.go173-176
The indexer uses dynamic batch sizing to balance throughput and memory usage based on system resources.
Diagram: Adaptive Batch Size Selection
Default Batch Sizes (CPU-aware, from types.go:62-91):
| CPU Cores | Batch Size | Worker Count | Shard Count | Queue Size | 
|---|---|---|---|---|
| 2-3 | 15,000 | 6-9 (3×) | 4 | 150,000 | 
| 4-7 | 18,000 | 12-21 (3×) | 4 | 180,000 | 
| 8-15 | 20,000 | 24-45 (3×) | 8 | 200,000 | 
| 16+ | 25,000 | 48+ (3×) | 16 | 250,000 | 
Formula from DefaultIndexerConfig():
Sources: internal/nginx_log/indexer/types.go62-91
The BatchWriter (types.go:288-294) provides streaming document submission:
Diagram: BatchWriter Streaming Pattern
Usage Example:
Sources: internal/nginx_log/indexer/parallel_indexer.go391-398
The ZeroAllocBatchProcessor uses object pooling to eliminate allocations during batch operations:
Pool Statistics:
Sources: internal/nginx_log/indexer/parallel_indexer.go96-99 internal/nginx_log/indexer/parallel_indexer.go409-414
The indexing system implements robust incremental indexing with checkpoint tracking to enable resume-on-failure and efficient re-indexing.
Diagram: NginxLogIndex Database Schema
Checkpoint Fields (from model/nginx_log_index.go and persistence.go:69-125):
| Field | Type | Purpose | Update Trigger | 
|---|---|---|---|
last_position | INTEGER | Byte offset in file for resume | After each batch (every 1000 docs) | 
last_size | INTEGER | File size at checkpoint | After each batch | 
last_modified | TIMESTAMP | File mtime (for change detection) | At indexing start | 
last_indexed | TIMESTAMP | When indexing completed | On completion | 
document_count | INTEGER | Total indexed documents | On completion | 
timerange_start | TIMESTAMP | Earliest log entry timestamp | On completion | 
timerange_end | TIMESTAMP | Latest log entry timestamp | On completion | 
index_status | TEXT | Current status (queued/indexing/indexed/error) | On status change | 
index_duration | INTEGER | Indexing duration (ms) | On completion | 
Sources: internal/nginx_log/indexer/persistence.go69-125 model/nginx_log_index.go
Recovery Logic:
On startup or failure recovery: 1. Load checkpoint from database 2. Check file modification time and size 3. Determine indexing strategy: - If file unchanged: Skip - If file grew: Resume from LastPosition - If file smaller: Full re-index (rotation detected) - If file modified but same size: Full re-index (overwrite) 4. Index with periodic checkpoint updates 5. Save final checkpoint on completion  Sources: internal/nginx_log/indexer/persistence.go128-156 internal/nginx_log/indexer/persistence.go209-222
Checkpoint Update Frequency:
Sources: internal/nginx_log/indexer/parallel_indexer.go1041-1147
The indexer dynamically scales its worker pool based on system load and queue depth to maximize throughput while preventing resource exhaustion.
Diagram: AdaptiveOptimizer Feedback Loop
Scaling Thresholds:
| Condition | Threshold | Action | Limits | 
|---|---|---|---|
| Queue depth high | > 80% full | Add workers | Max = CPUs × 4 | 
| Workers idle | > 50% idle time | Remove workers | Min = 2 | 
| Throughput declining | < 80% of baseline | Increase batch size | N/A | 
| Memory pressure | > 90% of quota | Decrease batch size | N/A | 
Sources: internal/nginx_log/indexer/parallel_indexer.go158-169 internal/nginx_log/indexer/parallel_indexer.go176-254
Sources: internal/nginx_log/indexer/parallel_indexer.go209-254
The IndexLogGroup() and IndexLogGroupWithRotationScanning() methods handle indexing of entire log groups including all rotated files.
File Discovery:
Input: basePath = "/var/log/nginx/access.log" Glob: "/var/log/nginx/access.log*" Matches: - /var/log/nginx/access.log - /var/log/nginx/access.log.1 - /var/log/nginx/access.log.2.gz - /var/log/nginx/access.log.2024-01-15.gz Output: All regular files (deduplicated)  Sources: internal/nginx_log/indexer/parallel_indexer.go706-772
The RotationScanner prioritizes files to maximize perceived frontend progress:
Priority Formula:
priority = (file_size * 0.7) + (recency_score * 0.3) recency_score = 1.0 - (age_days / 365.0)  Benefits:
Sources: internal/nginx_log/indexer/parallel_indexer.go774-873
Change Detection:
if file_size < last_size: // File was rotated strategy = FULL_REINDEX else if file_size == last_size AND file_mtime == last_modified: // No change strategy = SKIP else: // File grew strategy = INCREMENTAL (resume from last_position)  Sources: internal/nginx_log/indexer/parallel_indexer.go875-920 internal/nginx_log/indexer/rebuild.go323-394
The indexer provides extensive configuration options with automatic CPU-aware defaults.
CPU-Aware Defaults:
Sources: internal/nginx_log/indexer/types.go62-91
| Profile | Workers | Batch Size | Use Case | 
|---|---|---|---|
default | CPUs × 3 | 15-25k | Balanced performance | 
high_throughput | CPUs × 4 | 30-50k | Maximum indexing speed | 
low_latency | CPUs × 1.5 | 500 | Real-time updates | 
memory_constrained | CPUs ÷ 2 | 250 | Limited RAM (256MB quota) | 
cpu_intensive | CPUs × 4 | 25-45k | Multi-core optimization | 
max_performance | CPUs × 5 | 40-60k | All resources (2GB quota) | 
Sources: internal/nginx_log/indexer/types.go93-170
Sources: internal/nginx_log/indexer/parallel_indexer.go510-539
The indexer implements comprehensive error handling with automatic recovery strategies.
Retry Strategy:
max_retries = 3 backoff_base = 1 second for attempt in 1..max_retries: try: perform_operation() break except error: if is_transient(error): wait = backoff_base * (2 ^ attempt) sleep(wait) else: break  Sources: internal/nginx_log/indexer/persistence.go373-405
Recovery Scenarios:
| Scenario | Detection | Recovery | 
|---|---|---|
| Process crash | IndexStatus = "indexing" on startup | Resume from LastPosition | 
| Disk full | Write error during batch | Reduce batch size, retry | 
| Corrupt shard | Health check failure | Recreate shard from scratch | 
| Network error | I/O timeout | Exponential backoff retry | 
Sources: internal/nginx_log/indexer/parallel_indexer.go1041-1147
The indexer coordinates with the searcher through zero-downtime shard swapping.
Diagram: UpdateSearcherShards() Zero-Downtime Swap
UpdateSearcherShards() Implementation (modern_services.go:484-570):
Function Flow: 1. Schedule async update to avoid blocking (go updateSearcherShardsAsync()) 2. Wait 500ms for indexing operations to settle 3. Acquire write lock (servicesMutex.Lock()) 4. Check servicesInitialized && globalIndexer != nil 5. Verify indexer.IsHealthy() 6. Get newShards := globalIndexer.GetAllShards() 7. If globalSearcher == nil: - Create initial searcher with NewSearcher(config, newShards) - Create analytics service Else: - Call globalSearcher.SwapShards(newShards) - IndexAlias performs atomic pointer swap via Swap() 8. Verify globalSearcher.IsHealthy() and IsRunning() 9. Release lock (servicesMutex.Unlock())  Zero-Downtime Guarantees:
Searcher.IndexAlias uses atomic pointer operations (Swap method)Sources: internal/nginx_log/modern_services.go484-570 internal/nginx_log/searcher/distributed_searcher.go
Health Check Methods:
Sources: internal/nginx_log/indexer/parallel_indexer.go542-587
The indexing system exposes HTTP endpoints (registered in api/nginx_log routes):
| Endpoint | Method | Purpose | Handler Function | 
|---|---|---|---|
/api/nginx_log/rebuild_index | POST | Rebuild all or single file | RebuildIndex() (index_management.go:64) | 
/api/nginx_log/destroy_index | POST | Delete all index data | DestroyIndex() (index_management.go) | 
/api/nginx_log/list | GET | Get log files with index status | GetList() (nginx_log.go) | 
/api/nginx_log/preflight | GET | Check if logs are indexed | GetLogPreflight() (analytics.go:127) | 
/api/nginx_log/advanced_search | POST | Search indexed logs | AdvancedSearch() (analytics.go) | 
Request/Response Examples:
Async Processing:
Sources: api/nginx_log/index_management.go64-116
The Vue.js frontend integrates with the indexing system through:
Key Vue Components (app/src/views/nginx_log/):
| Component | File Path | Purpose | Key Methods | 
|---|---|---|---|
NginxLogList.vue | app/src/views/nginx_log/NginxLogList.vue1-552 | Log file list with index status columns | checkAdvancedIndexingStatus(), rebuildFileIndex() | 
IndexProgressBar.vue | app/src/views/nginx_log/indexing/components/IndexProgressBar.vue | Per-file progress visualization | Displays progress.percent, progress.stage | 
IndexManagement.vue | app/src/views/nginx_log/indexing/IndexManagement.vue | Bulk rebuild controls (Rebuild All, Destroy All) | handleRebuildAll(), handleDestroyAll() | 
useIndexProgress.ts | app/src/views/nginx_log/composables/useIndexProgress.ts | Pinia store for progress state | getProgressForFile(), isGlobalIndexing | 
StructuredLogViewer.vue | app/src/views/nginx_log/structured/StructuredLogViewer.vue1-700 | Search interface with preflight checks | checkIfIndexed(), performAdvancedSearch() | 
WebSocket Event Handling (via useWebSocketEventBus):
Event Types:
processing_status - Global indexing state (true/false)nginx_log_index_ready - Searcher shard swap completednginx_log_index_progress - Per-file indexing progress (percent, lines, stage)nginx_log_index_complete - File indexing completed (success/error)Sources: app/src/views/nginx_log/NginxLogList.vue89-135 app/src/views/nginx_log/structured/StructuredLogViewer.vue30-145
The indexing system uses a single table for all metadata tracking:
Table: nginx_log_indexes
Sources: internal/nginx_log/indexer/persistence.go69-125 model/nginx_log_index.go1-50
Index Statistics:
Health Checks:
Sources: internal/nginx_log/indexer/parallel_indexer.go510-539 internal/nginx_log/indexer/parallel_indexer.go580-587
The system implements automatic recovery for common failure scenarios:
last_position on restartRetry Strategy:
retry_count = 0 max_retries = 3 backoff = 1 second while retry_count < max_retries: try: index_file() break except error: retry_count++ sleep(backoff * 2^retry_count) if retry_count == max_retries: set_index_status("error")  Sources: internal/nginx_log/indexer/persistence.go373-405
Typical performance on modern hardware (8-core CPU, NVMe SSD):
| Operation | Throughput | Latency | 
|---|---|---|
| Parse log lines | 400,000 lines/sec | N/A | 
| Index documents | 50,000 docs/sec | N/A | 
| Full rebuild (1M docs) | ~20 seconds | N/A | 
| Single file rebuild | ~5 seconds | N/A | 
| Search query | N/A | 10-50ms | 
| Facet aggregation | N/A | 50-200ms | 
Factors Affecting Performance:
Refresh this wiki