DEV Community

Cover image for Understanding Kafka Lag
Lagat Josiah
Lagat Josiah

Posted on

Understanding Kafka Lag

Understanding Kafka Lag

What is Kafka Lag?

Kafka lag is the difference between the latest message offset in a topic partition and the current offset of a consumer. It represents the number of messages produced but not yet consumed—essentially, how far behind your consumer is from real-time.


Real-World Example: Cryptocurrency Price Streaming System

Let's examine a practical system that streams cryptocurrency prices:

Architecture:

  • Producer: Fetches BTC, ETH, and SOL prices from Binance API every 3 seconds
  • Kafka Topics: Three topics (btc, eth, sol) receiving price updates
  • Consumer: Reads from all topics and persists to MongoDB Atlas
  • Infrastructure: Docker-based Kafka cluster with Kafdrop monitoring

Current Implementation:

# producer.py - Sends 3 messages every 3 seconds (1 msg/sec) while True: for r in btc(): producer.send("btc", r) for r in eth(): producer.send("eth", r) for r in sol(): producer.send("sol", r) time.sleep(3) # consumer.py - Processes messages sequentially for msg in consumer: topic = msg.topic value = msg.value if topic == 'btc': col.btc.insert_one(value) # Blocking MongoDB write  elif topic == 'eth': col.eth.insert_one(value) elif topic == 'sol': col.sol.insert_one(value) 
Enter fullscreen mode Exit fullscreen mode

Root Causes of Lag in This System

1. MongoDB Network Latency (Primary Bottleneck)

The consumer writes to MongoDB Atlas (cloud database) for every single message:

  • Each insert_one() makes a network round-trip (~50-200ms)
  • SSL/TLS handshake overhead
  • Geographic distance between consumer and MongoDB cluster
  • Impact: Consumer spends most of its time waiting for network I/O

Calculation:

  • Producer rate: 1 message/second
  • Consumer processing: 50-200ms/message (network bound)
  • Result: Consumer can theoretically handle 5-20 msg/sec, but synchronous blocking limits it to ~3 msg/sec

2. Synchronous Blocking Processing

for msg in consumer: col.btc.insert_one(value) # Consumer blocks here for 50-200ms  # Next message can't be processed until this completes 
Enter fullscreen mode Exit fullscreen mode

The consumer cannot fetch or process the next message while waiting for MongoDB to acknowledge the write.

3. No Error Handling

# What happens when MongoDB is unreachable? col.btc.insert_one(value) # Exception crashes consumer # Lag accumulates until manual restart 
Enter fullscreen mode Exit fullscreen mode

Network failures, MongoDB timeouts, or connection issues crash the consumer immediately. During downtime:

  • Producer continues writing to Kafka
  • Consumer is offline
  • Lag grows unbounded until someone notices and restarts

4. Auto-Commit with Blocking I/O

consumer = KafkaConsumer( enable_auto_commit=True, # Commits offsets every 5 seconds by default  ... ) 
Enter fullscreen mode Exit fullscreen mode

The problem:

  • Kafka commits offsets periodically (every 5 seconds)
  • If MongoDB write fails after offset commit but before data persistence
  • Data loss occurs on consumer restart (offset already advanced)

5. Single Consumer, Multiple Topics

consumer = KafkaConsumer("btc", 'eth', 'sol', ...) # One consumer for all topics 
Enter fullscreen mode Exit fullscreen mode

One consumer processes three topics sequentially:

  • No parallelism across topics
  • BTC processing blocks ETH and SOL processing
  • Can't scale individual topics independently

Methods for Reducing and Eliminating Lag

Method 1: Batch Inserts

Problem: Individual insert_one() calls make 1 network round-trip per message.

Solution: Accumulate messages and use insert_many() for bulk writes.

from collections import defaultdict import time consumer = KafkaConsumer( "btc", "eth", "sol", bootstrap_servers=['localhost:9094'], value_deserializer=lambda m: json.loads(m.decode()), enable_auto_commit=False, # Manual control  max_poll_records=100, # Fetch more messages per poll  fetch_min_bytes=1024, fetch_max_wait_ms=500 ) mongo = MongoClient( "mongodb+srv://kimtryx_db_user:3100@cluster0.nds95yl.mongodb.net/", maxPoolSize=50, # Connection pooling  retryWrites=True ) col = mongo.fx BATCH_SIZE = 50 BATCH_TIMEOUT = 5 batches = defaultdict(list) last_commit = time.time() for msg in consumer: batches[msg.topic].append(msg.value) # Flush when batch is full or timeout reached  batch_ready = any(len(b) >= BATCH_SIZE for b in batches.values()) timeout_reached = (time.time() - last_commit) >= BATCH_TIMEOUT if batch_ready or timeout_reached: for topic, batch in batches.items(): if batch: getattr(col, topic).insert_many(batch) print(f"Inserted {len(batch)} documents into {topic}") consumer.commit() # Commit only after successful writes  batches.clear() last_commit = time.time() 
Enter fullscreen mode Exit fullscreen mode

Performance Improvement:

  • Before: 1 network call per message = ~3 msg/sec
  • After: 1 network call per 50 messages = ~150 msg/sec
  • Speedup: 50x throughput increase

Why it works:

  • MongoDB processes bulk inserts orders of magnitude faster
  • Network overhead amortized across multiple documents
  • Manual commits ensure durability (no data loss)

Method 2: Asynchronous Processing

Problem: Consumer blocks on MongoDB writes, can't fetch new messages.

Solution: Decouple Kafka consumption from MongoDB persistence using queues and worker threads.

from queue import Queue import threading message_queue = Queue(maxsize=1000) shutdown_flag = threading.Event() def batch_writer_worker(): """Background thread writes to MongoDB""" batches = defaultdict(list) BATCH_SIZE = 50 BATCH_TIMEOUT = 3 last_write = time.time() while not shutdown_flag.is_set(): try: msg = message_queue.get(timeout=1) if msg is None: # Shutdown signal  break batches[msg['topic']].append(msg['value']) # Write when ready  batch_ready = any(len(b) >= BATCH_SIZE for b in batches.values()) timeout_reached = (time.time() - last_write) >= BATCH_TIMEOUT if batch_ready or timeout_reached: for topic, batch in batches.items(): if batch: getattr(col, topic).insert_many(batch) print(f"Worker: Inserted {len(batch)} into {topic}") batches.clear() last_write = time.time() except Exception as e: print(f"Worker error: {e}") # Start background worker worker = threading.Thread(target=batch_writer_worker, daemon=True) worker.start() # Main consumer loop COMMIT_INTERVAL = 100 batch_count = 0 for msg in consumer: # Non-blocking: add to queue and continue  message_queue.put({'topic': msg.topic, 'value': msg.value}) batch_count += 1 if batch_count >= COMMIT_INTERVAL: consumer.commit() batch_count = 0 print(f"Queue size: {message_queue.qsize()}") 
Enter fullscreen mode Exit fullscreen mode

Performance Improvement:

  • Before: Consumer blocked 50-200ms per message
  • After: Consumer runs at full network speed, worker handles writes in parallel
  • Benefit: Queue buffers traffic bursts, eliminates blocking

Trade-offs:

  • Increased memory usage (in-memory queue)
  • More complex error handling
  • Slight increase in end-to-end latency (acceptable for most use cases)

Method 3: Error Handling and Resilience

Problem: Any error crashes the consumer, causing lag accumulation.

Solution: Implement retry logic, graceful degradation, and automatic recovery.

from pymongo.errors import BulkWriteError, ConnectionFailure import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def create_mongo_client(): """Create MongoDB client with retry logic""" while True: try: client = MongoClient( "mongodb+srv://...", maxPoolSize=50, retryWrites=True, serverSelectionTimeoutMS=5000 ) client.admin.command('ping') # Test connection  logger.info("✓ Connected to MongoDB") return client except Exception as e: logger.error(f"MongoDB connection failed: {e}, retrying in 5s...") time.sleep(5) def insert_batch_with_retry(collection, batch, topic_name): """Insert with exponential backoff""" MAX_RETRIES = 3 for attempt in range(MAX_RETRIES): try: collection.insert_many(batch, ordered=False) logger.info(f"✓ Inserted {len(batch)} docs into {topic_name}") return True except BulkWriteError as e: # Partial success - some docs inserted  logger.warning(f"Partial insert: {e.details}") return True except ConnectionFailure as e: wait = 2 ** attempt logger.error(f"Connection failed (attempt {attempt+1}), retry in {wait}s") time.sleep(wait) except Exception as e: logger.error(f"Unexpected error: {e}") return False logger.error(f"Failed after {MAX_RETRIES} attempts") return False mongo = create_mongo_client() col = mongo.fx try: for msg in consumer: batches[msg.topic].append(msg.value) if batch_ready or timeout_reached: all_success = True for topic, batch in batches.items(): if batch: success = insert_batch_with_retry( getattr(col, topic), batch, topic ) if not success: all_success = False if all_success: consumer.commit() else: logger.warning("Skipping commit due to failures") batches.clear() except KeyboardInterrupt: logger.info("Graceful shutdown...") # Flush remaining batches  for topic, batch in batches.items(): if batch: insert_batch_with_retry(getattr(col, topic), batch, topic) consumer.commit() finally: consumer.close() mongo.close() 
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Automatic recovery from transient network failures
  • No data loss from crashes
  • Exponential backoff prevents overwhelming failed services
  • Graceful shutdown ensures no message loss

Method 4: Horizontal Scaling with Multiple Consumers

Problem: Single consumer can't parallelize across topics.

Solution: Deploy separate consumer instances for each topic.

# consumer_btc.py consumer = KafkaConsumer( "btc", bootstrap_servers=['localhost:9094'], value_deserializer=lambda m: json.loads(m.decode()), enable_auto_commit=False, group_id="crypto-consumer-group" ) mongo = MongoClient("mongodb+srv://...") col = mongo.fx.btc BATCH_SIZE = 50 batch = [] for msg in consumer: batch.append(msg.value) if len(batch) >= BATCH_SIZE: col.insert_many(batch) consumer.commit() print(f"BTC: Inserted {len(batch)} documents") batch.clear() 
Enter fullscreen mode Exit fullscreen mode

Deployment:

# Run three separate processes python consumer_btc.py & python consumer_eth.py & python consumer_sol.py & 
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • True parallel processing across topics
  • Each consumer optimized for its specific topic
  • Fault isolation (BTC consumer crash doesn't affect ETH/SOL)
  • Scale individual topics based on their load
  • Simplified code per consumer

Scaling further:

  • Increase partitions per topic (e.g., btc-0, btc-1, btc-2)
  • Run multiple consumers per topic (Kafka auto-balances partitions)
  • Deploy in Kubernetes with HPA based on lag metrics

Method 5: Consumer Configuration Optimization

Problem: Default Kafka consumer settings aren't optimized for high throughput.

Solution: Tune configuration parameters for your workload.

consumer = KafkaConsumer( "btc", "eth", "sol", bootstrap_servers=['localhost:9094'], value_deserializer=lambda m: json.loads(m.decode()), enable_auto_commit=False, auto_offset_reset="earliest", # Fetch optimization  max_poll_records=500, # Fetch more messages per poll  fetch_min_bytes=10240, # Wait for 10KB before returning  fetch_max_wait_ms=500, # Or wait max 500ms  # Prevent rebalancing  session_timeout_ms=30000, # 30s before considered dead  heartbeat_interval_ms=3000, # Send heartbeat every 3s  max_poll_interval_ms=300000, # 5 min max processing time  # Connection management  connections_max_idle_ms=540000, request_timeout_ms=30000 ) 
Enter fullscreen mode Exit fullscreen mode

Parameter explanations:

Parameter Purpose Impact
max_poll_records Messages fetched per poll() Higher = fewer network calls
fetch_min_bytes Minimum data before return Reduces small fetches
fetch_max_wait_ms Max wait for fetch_min_bytes Balances latency vs throughput
session_timeout_ms Timeout before rebalance Prevents unnecessary rebalances
max_poll_interval_ms Max processing time Allows longer batch processing

Tuning strategy:

  • High throughput: Increase max_poll_records, fetch_min_bytes
  • Low latency: Decrease fetch_max_wait_ms
  • Long processing: Increase max_poll_interval_ms

Method 6: Monitoring and Observability

Problem: Can't detect lag until it causes visible issues.

Solution: Implement proactive monitoring and alerting.

from kafka import TopicPartition def check_consumer_lag(consumer, topics): """Calculate and report lag for all partitions""" lag_data = {} for topic in topics: partitions = consumer.partitions_for_topic(topic) for partition in partitions: tp = TopicPartition(topic, partition) # Current consumer position  position = consumer.position(tp) # Latest available offset  end_offsets = consumer.end_offsets([tp]) end_offset = end_offsets[tp] # Calculate lag  current_lag = end_offset - position lag_data[f"{topic}-{partition}"] = { 'position': position, 'end_offset': end_offset, 'lag': current_lag } # Alert on high lag  if current_lag > 1000: logger.warning(f"⚠️ HIGH LAG: {topic}[{partition}] = {current_lag}") else: logger.info(f"{topic}[{partition}] lag: {current_lag}") return lag_data # Monitor periodically import threading def monitoring_loop(): while True: lag_data = check_consumer_lag(consumer, ["btc", "eth", "sol"]) time.sleep(30) # Check every 30 seconds  monitor_thread = threading.Thread(target=monitoring_loop, daemon=True) monitor_thread.start() 
Enter fullscreen mode Exit fullscreen mode

Metrics to track:

  • Consumer lag: Messages behind per partition
  • Consumption rate: Messages processed per second
  • Processing time: Time spent per message
  • Error rate: Failed processing attempts
  • Queue depth: For async processing implementations

Visualization with Kafka's JMX metrics:

# prometheus.yml - scrape Kafka metrics scrape_configs: - job_name: 'kafka' static_configs: - targets: ['kafka:9092'] # Alert rules groups: - name: kafka_lag rules: - alert: HighConsumerLag expr: kafka_consumer_lag > 1000 for: 5m annotations: summary: "Consumer lag exceeds 1000 messages" 
Enter fullscreen mode Exit fullscreen mode

Performance Comparison

Implementation Throughput Latency Complexity Reliability
Original (insert_one) 3 msg/sec 50-200ms Low Low (crashes)
Batch inserts 150 msg/sec 5s Low Medium
Async + batching 500+ msg/sec 3-5s Medium High
Multiple consumers 1500+ msg/sec 3-5s Medium Very High
All optimizations 5000+ msg/sec 3-5s High Very High

Implementation Strategy

Step 1: Batch Inserts (Immediate Impact)

Start with batch inserts—low complexity, massive improvement:

batches[topic].append(value) if len(batches[topic]) >= BATCH_SIZE: collection.insert_many(batches[topic]) consumer.commit() 
Enter fullscreen mode Exit fullscreen mode

Expected: 50x throughput increase

Step 2: Error Handling (Stability)

Add retry logic and graceful shutdown:

def insert_with_retry(collection, batch): for attempt in range(3): try: collection.insert_many(batch) return True except ConnectionFailure: time.sleep(2 ** attempt) return False 
Enter fullscreen mode Exit fullscreen mode

Expected: Eliminate crash-related lag

Step 3: Configuration Tuning (Easy Wins)

Optimize consumer settings:

max_poll_records=500, fetch_min_bytes=10240, session_timeout_ms=30000 
Enter fullscreen mode Exit fullscreen mode

Expected: 2-3x additional throughput

Step 4: Async Processing (Advanced)

When Steps 1-3 aren't enough:

message_queue.put(msg) # Non-blocking # Worker thread handles MongoDB 
Enter fullscreen mode Exit fullscreen mode

Expected: Handle traffic bursts, eliminate blocking

Step 5: Horizontal Scaling (Production-Grade)

Deploy multiple consumer instances:

docker-compose scale consumer=3 
Enter fullscreen mode Exit fullscreen mode

Expected: Linear scaling with consumer count

Step 6: Monitoring (Operational Excellence)

Implement lag tracking and alerts:

if lag > THRESHOLD: alert_ops_team() auto_scale_consumers() 
Enter fullscreen mode Exit fullscreen mode

Expected: Proactive lag management


Key Takeaways

  1. Network I/O is typically the bottleneck in Kafka consumers—batch operations amortize this cost
  2. Manual offset commits after successful processing ensure exactly-once semantics and prevent data loss
  3. Error handling and retries are essential for production systems—transient failures are inevitable
  4. Asynchronous processing decouples consumption from persistence, enabling higher throughput
  5. Horizontal scaling provides linear performance improvements and fault tolerance
  6. Monitoring enables proactive management—detect and resolve lag before it impacts SLAs
  7. Configuration tuning can double or triple throughput with minimal code changes

The cryptocurrency price streaming system demonstrates how common patterns—synchronous processing, individual writes, poor error handling—create lag. By applying these methods systematically, you can transform a 3 msg/sec system struggling with lag into a 1000+ msg/sec production-grade pipeline that handles real-time data reliably.

Top comments (0)