Menu

Parallel Indexing Pipeline

Relevant source files

Purpose and Scope

The Parallel Indexing Pipeline is the core indexing engine that transforms raw Nginx log files into searchable Bleve indexes. It implements high-performance parallel document processing using worker pools, SIMD-optimized parsing, adaptive batch sizing, and checkpoint-based recovery. This page focuses on the ParallelIndexer architecture, its job processing pipeline, shard distribution strategy, and incremental indexing capabilities.

For log file discovery and metadata management, see Log Discovery and File Management. For search query execution, see Search and Query System.

Sources: internal/nginx_log/indexer/parallel_indexer.go1-1500 internal/nginx_log/indexer/types.go1-513 internal/nginx_log/indexer/parser.go1-279


ParallelIndexer Architecture

The ParallelIndexer orchestrates high-throughput log indexing through a multi-stage pipeline with concurrent workers, intelligent batching, and shard-aware document routing.

ParallelIndexer Structure

Diagram: ParallelIndexer Component Architecture

Key Fields:

FieldTypePurpose
workers[]*indexWorkerPool of worker goroutines processing jobs
jobQueuechan *IndexJobBuffered channel for incoming indexing jobs
resultQueuechan *IndexResultBuffered channel for job results
shardManagerShardManagerRoutes documents to appropriate shards
adaptiveOptimizer*AdaptiveOptimizerDynamic worker and batch size tuning
zeroAllocProcessor*ZeroAllocBatchProcessorObject pooling for memory efficiency
rotationScanner*RotationScannerOptimized scanning of rotated log files

Sources: internal/nginx_log/indexer/parallel_indexer.go18-52 internal/nginx_log/indexer/types.go47-92

Initialization and Startup

Sources: internal/nginx_log/indexer/parallel_indexer.go62-121 internal/nginx_log/indexer/parallel_indexer.go124-174


Job Processing Pipeline

The indexing pipeline processes documents through multiple stages with concurrent workers and intelligent batching.

Job Submission Flow

IndexJob Structure:

Job Submission Methods:

MethodBehaviorUse Case
IndexDocument(ctx, doc)Synchronous, blocks until completeSingle document, immediate feedback
IndexDocuments(ctx, docs)Synchronous, blocks until completeBatch, wait for completion
IndexDocumentAsync(doc, callback)Asynchronous, returns immediatelyNon-blocking, callback on completion
IndexDocumentsAsync(docs, callback)Asynchronous, returns immediatelyBatch non-blocking

Sources: internal/nginx_log/indexer/parallel_indexer.go308-389 internal/nginx_log/indexer/types.go205-210

Worker Execution Model

Each indexWorker runs in its own goroutine, continuously processing jobs from the queue:

Sources: internal/nginx_log/indexer/parallel_indexer.go940-977 internal/nginx_log/indexer/parallel_indexer.go979-1039


Shard Distribution and Routing

The GroupedShardManager implements content-aware document routing to optimize query performance by co-locating related documents.

Shard Routing Strategy

Diagram: GroupedShardManager.GetShardForDocument() Flow

GroupedShardManager.GetShardForDocument() Implementation:

From internal/nginx_log/indexer/grouped_shard_manager.go150-170:

Input: mainLogPath string, documentID string Process: 1. Validate mainLogPath is non-empty (required for grouped sharding) 2. Create FNV-1a 32-bit hasher 3. Compute hash = fnv1a.New32().Write([]byte(mainLogPath)).Sum32() 4. Compute shardID = hash % uint32(len(shards)) 5. Retrieve bleve.Index from gsm.shards[shardID] 6. Return (index, int(shardID), nil) Benefits: - All documents from same log group (main_log_path) in same shard - Enables single-shard queries for log group filters - Balanced distribution via hash function - Deterministic routing (same path → same shard) 

Benefits:

BenefitExplanation
Query LocalityAll documents from same log group in one shard → single-shard queries
Balanced DistributionHash-based routing distributes groups evenly across shards
Stable RoutingSame main_log_path always routes to same shard
Rebuild SafetyDeterministic routing ensures consistent shard placement

Sources: internal/nginx_log/indexer/parallel_indexer.go986-1004 internal/nginx_log/indexer/types.go296-311

Batch Writing to Shards

Workers batch documents before writing to Bleve for efficiency:

Sources: internal/nginx_log/indexer/parallel_indexer.go1007-1039


SIMD-Optimized Parsing

The parsing subsystem uses vectorized instructions (SIMD) to achieve 7-8x faster log line processing with 70% memory reduction.

Parser Architecture

Diagram: Global Parser Singleton and API

Parser Initialization:

Sources: internal/nginx_log/indexer/parser.go23-52 internal/nginx_log/indexer/parser.go60-76

Parsing Pipeline

Performance Characteristics:

MetricStandard ParserSIMD ParserImprovement
Throughput~50,000 lines/sec~400,000 lines/sec8x faster
Memory per line~2 KB~600 bytes70% reduction
Allocations~50 per line~3 per line94% reduction
Cache hit rateN/A85% (UA parsing)N/A

Sources: internal/nginx_log/indexer/parser.go79-110 internal/nginx_log/indexer/parser.go172-231

Memory Pool Optimization

Object Pools (utils package):

  • LogStringBuilderPool - Reusable string builders for ID generation
  • BufferPool - Byte buffers for I/O operations
  • DocumentPool - Reusable document structures

Sources: internal/nginx_log/indexer/parser.go173-176


Batching Strategies

The indexer uses dynamic batch sizing to balance throughput and memory usage based on system resources.

Adaptive Batch Sizing

Diagram: Adaptive Batch Size Selection

Default Batch Sizes (CPU-aware, from types.go:62-91):

CPU CoresBatch SizeWorker CountShard CountQueue Size
2-315,0006-9 (3×)4150,000
4-718,00012-21 (3×)4180,000
8-1520,00024-45 (3×)8200,000
16+25,00048+ (3×)16250,000

Formula from DefaultIndexerConfig():

Sources: internal/nginx_log/indexer/types.go62-91

Batch Writer API

The BatchWriter (types.go:288-294) provides streaming document submission:

Diagram: BatchWriter Streaming Pattern

Usage Example:

Sources: internal/nginx_log/indexer/parallel_indexer.go391-398

Zero-Allocation Batch Processing

The ZeroAllocBatchProcessor uses object pooling to eliminate allocations during batch operations:

Pool Statistics:

Sources: internal/nginx_log/indexer/parallel_indexer.go96-99 internal/nginx_log/indexer/parallel_indexer.go409-414


Checkpoint-Based Recovery

The indexing system implements robust incremental indexing with checkpoint tracking to enable resume-on-failure and efficient re-indexing.

Checkpoint Storage

Diagram: NginxLogIndex Database Schema

Checkpoint Fields (from model/nginx_log_index.go and persistence.go:69-125):

FieldTypePurposeUpdate Trigger
last_positionINTEGERByte offset in file for resumeAfter each batch (every 1000 docs)
last_sizeINTEGERFile size at checkpointAfter each batch
last_modifiedTIMESTAMPFile mtime (for change detection)At indexing start
last_indexedTIMESTAMPWhen indexing completedOn completion
document_countINTEGERTotal indexed documentsOn completion
timerange_startTIMESTAMPEarliest log entry timestampOn completion
timerange_endTIMESTAMPLatest log entry timestampOn completion
index_statusTEXTCurrent status (queued/indexing/indexed/error)On status change
index_durationINTEGERIndexing duration (ms)On completion

Sources: internal/nginx_log/indexer/persistence.go69-125 model/nginx_log_index.go

Incremental Indexing Flow

Recovery Logic:

On startup or failure recovery: 1. Load checkpoint from database 2. Check file modification time and size 3. Determine indexing strategy: - If file unchanged: Skip - If file grew: Resume from LastPosition - If file smaller: Full re-index (rotation detected) - If file modified but same size: Full re-index (overwrite) 4. Index with periodic checkpoint updates 5. Save final checkpoint on completion 

Sources: internal/nginx_log/indexer/persistence.go128-156 internal/nginx_log/indexer/persistence.go209-222

Progress Tracking with Checkpoints

Checkpoint Update Frequency:

  • Every 1000 documents during active indexing
  • Every 5 seconds (time-based)
  • On completion (final checkpoint)
  • On error (save position for recovery)

Sources: internal/nginx_log/indexer/parallel_indexer.go1041-1147


Worker Pool Optimization

The indexer dynamically scales its worker pool based on system load and queue depth to maximize throughput while preventing resource exhaustion.

Adaptive Worker Scaling

Diagram: AdaptiveOptimizer Feedback Loop

Scaling Thresholds:

ConditionThresholdActionLimits
Queue depth high> 80% fullAdd workersMax = CPUs × 4
Workers idle> 50% idle timeRemove workersMin = 2
Throughput declining< 80% of baselineIncrease batch sizeN/A
Memory pressure> 90% of quotaDecrease batch sizeN/A

Sources: internal/nginx_log/indexer/parallel_indexer.go158-169 internal/nginx_log/indexer/parallel_indexer.go176-254

Worker Lifecycle

Sources: internal/nginx_log/indexer/parallel_indexer.go209-254


Full Log Group Indexing

The IndexLogGroup() and IndexLogGroupWithRotationScanning() methods handle indexing of entire log groups including all rotated files.

Log Group Discovery

File Discovery:

Input: basePath = "/var/log/nginx/access.log" Glob: "/var/log/nginx/access.log*" Matches: - /var/log/nginx/access.log - /var/log/nginx/access.log.1 - /var/log/nginx/access.log.2.gz - /var/log/nginx/access.log.2024-01-15.gz Output: All regular files (deduplicated) 

Sources: internal/nginx_log/indexer/parallel_indexer.go706-772

Rotation Scanner Optimization

The RotationScanner prioritizes files to maximize perceived frontend progress:

Priority Formula:

priority = (file_size * 0.7) + (recency_score * 0.3) recency_score = 1.0 - (age_days / 365.0) 

Benefits:

  • Large files indexed first → visible progress quickly
  • Recent logs prioritized → higher user value
  • Compressed archives batched efficiently
  • Parallelism maintained across multiple groups

Sources: internal/nginx_log/indexer/parallel_indexer.go774-873

Incremental vs Full Indexing

Change Detection:

if file_size < last_size: // File was rotated strategy = FULL_REINDEX else if file_size == last_size AND file_mtime == last_modified: // No change strategy = SKIP else: // File grew strategy = INCREMENTAL (resume from last_position) 

Sources: internal/nginx_log/indexer/parallel_indexer.go875-920 internal/nginx_log/indexer/rebuild.go323-394


Configuration and Tuning

The indexer provides extensive configuration options with automatic CPU-aware defaults.

Configuration Structure

CPU-Aware Defaults:

Sources: internal/nginx_log/indexer/types.go62-91

Performance Profiles

ProfileWorkersBatch SizeUse Case
defaultCPUs × 315-25kBalanced performance
high_throughputCPUs × 430-50kMaximum indexing speed
low_latencyCPUs × 1.5500Real-time updates
memory_constrainedCPUs ÷ 2250Limited RAM (256MB quota)
cpu_intensiveCPUs × 425-45kMulti-core optimization
max_performanceCPUs × 540-60kAll resources (2GB quota)

Sources: internal/nginx_log/indexer/types.go93-170

Runtime Statistics

Sources: internal/nginx_log/indexer/parallel_indexer.go510-539


Error Handling and Recovery

The indexer implements comprehensive error handling with automatic recovery strategies.

Error Classification

Retry Strategy:

max_retries = 3 backoff_base = 1 second for attempt in 1..max_retries: try: perform_operation() break except error: if is_transient(error): wait = backoff_base * (2 ^ attempt) sleep(wait) else: break 

Sources: internal/nginx_log/indexer/persistence.go373-405

Checkpoint-Based Recovery

Recovery Scenarios:

ScenarioDetectionRecovery
Process crashIndexStatus = "indexing" on startupResume from LastPosition
Disk fullWrite error during batchReduce batch size, retry
Corrupt shardHealth check failureRecreate shard from scratch
Network errorI/O timeoutExponential backoff retry

Sources: internal/nginx_log/indexer/parallel_indexer.go1041-1147


Integration with Search System

The indexer coordinates with the searcher through zero-downtime shard swapping.

Shard Hot-Swap Mechanism

Diagram: UpdateSearcherShards() Zero-Downtime Swap

UpdateSearcherShards() Implementation (modern_services.go:484-570):

Function Flow: 1. Schedule async update to avoid blocking (go updateSearcherShardsAsync()) 2. Wait 500ms for indexing operations to settle 3. Acquire write lock (servicesMutex.Lock()) 4. Check servicesInitialized && globalIndexer != nil 5. Verify indexer.IsHealthy() 6. Get newShards := globalIndexer.GetAllShards() 7. If globalSearcher == nil: - Create initial searcher with NewSearcher(config, newShards) - Create analytics service Else: - Call globalSearcher.SwapShards(newShards) - IndexAlias performs atomic pointer swap via Swap() 8. Verify globalSearcher.IsHealthy() and IsRunning() 9. Release lock (servicesMutex.Unlock()) 

Zero-Downtime Guarantees:

  • Searcher.IndexAlias uses atomic pointer operations (Swap method)
  • In-flight queries complete on old shards without interruption
  • New queries immediately routed to new shards
  • No query failures or downtime during transition
  • Old shards remain referenced until queries complete (GC handles cleanup)

Sources: internal/nginx_log/modern_services.go484-570 internal/nginx_log/searcher/distributed_searcher.go

Health Monitoring

Health Check Methods:

Sources: internal/nginx_log/indexer/parallel_indexer.go542-587


Integration Points

API Endpoints

The indexing system exposes HTTP endpoints (registered in api/nginx_log routes):

EndpointMethodPurposeHandler Function
/api/nginx_log/rebuild_indexPOSTRebuild all or single fileRebuildIndex() (index_management.go:64)
/api/nginx_log/destroy_indexPOSTDelete all index dataDestroyIndex() (index_management.go)
/api/nginx_log/listGETGet log files with index statusGetList() (nginx_log.go)
/api/nginx_log/preflightGETCheck if logs are indexedGetLogPreflight() (analytics.go:127)
/api/nginx_log/advanced_searchPOSTSearch indexed logsAdvancedSearch() (analytics.go)

Request/Response Examples:

Async Processing:

  • RebuildIndex() returns immediately (200 OK)
  • Actual indexing runs in background goroutine (performAsyncRebuild)
  • Progress tracked via WebSocket events (nginx_log_index_progress)
  • Completion notified via WebSocket (nginx_log_index_complete)

Sources: api/nginx_log/index_management.go64-116

Frontend Integration

The Vue.js frontend integrates with the indexing system through:

  1. Index Status Display - Real-time badge showing indexed/indexing/queued states
  2. Progress Bars - Per-file progress tracking during indexing
  3. Rebuild Controls - Buttons to trigger full or partial rebuilds
  4. WebSocket Subscriptions - Real-time event handling

Key Vue Components (app/src/views/nginx_log/):

ComponentFile PathPurposeKey Methods
NginxLogList.vueapp/src/views/nginx_log/NginxLogList.vue1-552Log file list with index status columnscheckAdvancedIndexingStatus(), rebuildFileIndex()
IndexProgressBar.vueapp/src/views/nginx_log/indexing/components/IndexProgressBar.vuePer-file progress visualizationDisplays progress.percent, progress.stage
IndexManagement.vueapp/src/views/nginx_log/indexing/IndexManagement.vueBulk rebuild controls (Rebuild All, Destroy All)handleRebuildAll(), handleDestroyAll()
useIndexProgress.tsapp/src/views/nginx_log/composables/useIndexProgress.tsPinia store for progress stategetProgressForFile(), isGlobalIndexing
StructuredLogViewer.vueapp/src/views/nginx_log/structured/StructuredLogViewer.vue1-700Search interface with preflight checkscheckIfIndexed(), performAdvancedSearch()

WebSocket Event Handling (via useWebSocketEventBus):

Event Types:

  • processing_status - Global indexing state (true/false)
  • nginx_log_index_ready - Searcher shard swap completed
  • nginx_log_index_progress - Per-file indexing progress (percent, lines, stage)
  • nginx_log_index_complete - File indexing completed (success/error)

Sources: app/src/views/nginx_log/NginxLogList.vue89-135 app/src/views/nginx_log/structured/StructuredLogViewer.vue30-145

Database Schema

The indexing system uses a single table for all metadata tracking:

Table: nginx_log_indexes

Sources: internal/nginx_log/indexer/persistence.go69-125 model/nginx_log_index.go1-50


Operational Considerations

Monitoring and Diagnostics

Index Statistics:

Health Checks:

Sources: internal/nginx_log/indexer/parallel_indexer.go510-539 internal/nginx_log/indexer/parallel_indexer.go580-587

Error Recovery

The system implements automatic recovery for common failure scenarios:

  1. Shard Corruption - Automatically recreates corrupt shards
  2. OOM Protection - Reduces batch size when memory pressure detected
  3. File Read Errors - Retries with exponential backoff
  4. Incomplete Indexing - Resumes from last_position on restart

Retry Strategy:

retry_count = 0 max_retries = 3 backoff = 1 second while retry_count < max_retries: try: index_file() break except error: retry_count++ sleep(backoff * 2^retry_count) if retry_count == max_retries: set_index_status("error") 

Sources: internal/nginx_log/indexer/persistence.go373-405

Performance Benchmarks

Typical performance on modern hardware (8-core CPU, NVMe SSD):

OperationThroughputLatency
Parse log lines400,000 lines/secN/A
Index documents50,000 docs/secN/A
Full rebuild (1M docs)~20 secondsN/A
Single file rebuild~5 secondsN/A
Search queryN/A10-50ms
Facet aggregationN/A50-200ms

Factors Affecting Performance:

  • CPU core count (worker parallelism)
  • Disk I/O speed (log file reading)
  • Memory available (batch size, caching)
  • Index size (shard count, segment merging)
  • Network latency (WebSocket events)

Sources: internal/nginx_log/indexer/README.md1-100