5
5
import time
6
6
from ducktape .services .service import Service
7
7
8
+ from confluent_kafka .admin import AdminClient , NewTopic , NewPartitions
9
+
8
10
9
11
class KafkaClient (Service ):
10
12
"""Kafka client wrapper - assumes external Kafka is running"""
11
13
14
+ DEFAULT_TIMEOUT = 10
15
+
12
16
def __init__ (self , context , bootstrap_servers = "localhost:9092" ):
13
17
# Use num_nodes=0 since we're not managing any nodes
14
18
super (KafkaClient , self ).__init__ (context , num_nodes = 0 )
@@ -33,23 +37,20 @@ def bootstrap_servers(self):
33
37
def verify_connection (self ):
34
38
"""Verify that Kafka is accessible"""
35
39
try :
36
- from confluent_kafka .admin import AdminClient
37
40
admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
38
41
39
42
# Try to get cluster metadata to verify connection
40
- metadata = admin_client .list_topics (timeout = 10 )
43
+ metadata = admin_client .list_topics (timeout = self . DEFAULT_TIMEOUT )
41
44
self .logger .info ("Successfully connected to Kafka. Available topics: %s" ,
42
45
list (metadata .topics .keys ()))
43
46
return True
44
47
except Exception as e :
45
- self .logger .error ("Failed to connect to Kafka at %s : %s" , self . bootstrap_servers_str , e )
48
+ self .logger .error ("Failed to connect to Kafka: %s" , e )
46
49
return False
47
50
48
51
def create_topic (self , topic , partitions = 1 , replication_factor = 1 ):
49
52
"""Create a topic using Kafka admin client"""
50
53
try :
51
- from confluent_kafka .admin import AdminClient , NewTopic
52
-
53
54
admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
54
55
55
56
topic_config = NewTopic (
@@ -75,26 +76,19 @@ def create_topic(self, topic, partitions=1, replication_factor=1):
75
76
else :
76
77
self .logger .warning ("Failed to create topic %s: %s" , topic_name , e )
77
78
78
- except ImportError :
79
- self .logger .error ("confluent_kafka not available for topic creation" )
80
79
except Exception as e :
81
80
self .logger .error ("Failed to create topic %s: %s" , topic , e )
82
81
83
82
def list_topics (self ):
84
83
"""List all topics using admin client"""
85
84
try :
86
- from confluent_kafka .admin import AdminClient
87
-
88
85
admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
89
- metadata = admin_client .list_topics (timeout = 10 )
86
+ metadata = admin_client .list_topics (timeout = self . DEFAULT_TIMEOUT )
90
87
91
88
topics = list (metadata .topics .keys ())
92
89
self .logger .debug ("Available topics: %s" , topics )
93
90
return topics
94
91
95
- except ImportError :
96
- self .logger .error ("confluent_kafka not available for listing topics" )
97
- return []
98
92
except Exception as e :
99
93
self .logger .error ("Failed to list topics: %s" , e )
100
94
return []
@@ -124,3 +118,28 @@ def wait_for_topic(self, topic_name, max_wait_time=30, initial_wait=0.1):
124
118
125
119
self .logger .error ("Timeout waiting for topic '%s' after %ds" , topic_name , max_wait_time )
126
120
return False
121
+
122
+ def add_partitions (self , topic_name , new_partition_count ):
123
+ """Add partitions to an existing topic"""
124
+ try :
125
+ admin_client = AdminClient ({'bootstrap.servers' : self .bootstrap_servers_str })
126
+ metadata = admin_client .list_topics (timeout = self .DEFAULT_TIMEOUT )
127
+
128
+ if topic_name not in metadata .topics :
129
+ raise ValueError (f"Topic { topic_name } does not exist" )
130
+
131
+ current_partitions = len (metadata .topics [topic_name ].partitions )
132
+ if new_partition_count <= current_partitions :
133
+ return # No change needed
134
+
135
+ # Add partitions
136
+ new_partitions = NewPartitions (topic = topic_name , new_total_count = new_partition_count )
137
+ fs = admin_client .create_partitions ([new_partitions ])
138
+
139
+ # Wait for completion
140
+ for topic , f in fs .items ():
141
+ f .result (timeout = 30 )
142
+
143
+ except Exception as e :
144
+ self .logger .error ("Failed to add partitions to topic %s: %s" , topic_name , e )
145
+ raise
0 commit comments