Skip to content
45 changes: 32 additions & 13 deletions tests/ducktape/services/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import time
from ducktape.services.service import Service

from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions


class KafkaClient(Service):
"""Kafka client wrapper - assumes external Kafka is running"""

DEFAULT_TIMEOUT = 10

def __init__(self, context, bootstrap_servers="localhost:9092"):
# Use num_nodes=0 since we're not managing any nodes
super(KafkaClient, self).__init__(context, num_nodes=0)
Expand All @@ -33,23 +37,20 @@
def verify_connection(self):
"""Verify that Kafka is accessible"""
try:
from confluent_kafka.admin import AdminClient
admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})

Check failure on line 40 in tests/ducktape/services/kafka.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/ducktape/services/kafka.py#L40

Define a constant instead of duplicating this literal 'bootstrap.servers' 4 times.

# Try to get cluster metadata to verify connection
metadata = admin_client.list_topics(timeout=10)
metadata = admin_client.list_topics(timeout=self.DEFAULT_TIMEOUT)
self.logger.info("Successfully connected to Kafka. Available topics: %s",
list(metadata.topics.keys()))
return True
except Exception as e:
self.logger.error("Failed to connect to Kafka at %s: %s", self.bootstrap_servers_str, e)
self.logger.error("Failed to connect to Kafka: %s", e)
return False

def create_topic(self, topic, partitions=1, replication_factor=1):
"""Create a topic using Kafka admin client"""
try:
from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})

topic_config = NewTopic(
Expand All @@ -75,26 +76,19 @@
else:
self.logger.warning("Failed to create topic %s: %s", topic_name, e)

except ImportError:
self.logger.error("confluent_kafka not available for topic creation")
except Exception as e:
self.logger.error("Failed to create topic %s: %s", topic, e)

def list_topics(self):
"""List all topics using admin client"""
try:
from confluent_kafka.admin import AdminClient

admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})
metadata = admin_client.list_topics(timeout=10)
metadata = admin_client.list_topics(timeout=self.DEFAULT_TIMEOUT)

topics = list(metadata.topics.keys())
self.logger.debug("Available topics: %s", topics)
return topics

except ImportError:
self.logger.error("confluent_kafka not available for listing topics")
return []
except Exception as e:
self.logger.error("Failed to list topics: %s", e)
return []
Expand Down Expand Up @@ -124,3 +118,28 @@

self.logger.error("Timeout waiting for topic '%s' after %ds", topic_name, max_wait_time)
return False

def add_partitions(self, topic_name, new_partition_count):
"""Add partitions to an existing topic"""
try:
admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})
metadata = admin_client.list_topics(timeout=self.DEFAULT_TIMEOUT)

if topic_name not in metadata.topics:
raise ValueError(f"Topic {topic_name} does not exist")

current_partitions = len(metadata.topics[topic_name].partitions)
if new_partition_count <= current_partitions:
return # No change needed

# Add partitions
new_partitions = NewPartitions(topic=topic_name, new_total_count=new_partition_count)
fs = admin_client.create_partitions([new_partitions])

# Wait for completion
for topic, f in fs.items():
f.result(timeout=30)

except Exception as e:
self.logger.error("Failed to add partitions to topic %s: %s", topic_name, e)
raise
Loading
Loading