Skip to content

Commit b12119a

Browse files
committed
Add support for config entries in the topic creation API
1 parent f8eaecc commit b12119a

File tree

6 files changed

+35
-6
lines changed

6 files changed

+35
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Changes and additions to the library will be listed here.
44

55
## Unreleased
66

7+
- Add support for config entries in the topic creation API.
8+
79
## v0.5.3
810

911
- Add support for the topic deletion API (#528).

lib/kafka/client.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,16 @@ def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes
457457
# @param replication_factor [Integer] the replication factor of the topic.
458458
# @param timeout [Integer] a duration of time to wait for the topic to be
459459
# completely created.
460+
# @param config_entries [Hash] topic-level configs to use for the topic.
461+
# See https://kafka.apache.org/documentation/#topicconfigs.
460462
# @raise [Kafka::TopicAlreadyExists] if the topic already exists.
461463
# @return [nil]
462-
def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30)
463-
@cluster.create_topic(name, num_partitions: num_partitions, replication_factor: replication_factor, timeout: timeout)
464+
def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30, config_entries: {})
465+
@cluster.create_topic(name,
466+
num_partitions: num_partitions,
467+
replication_factor: replication_factor,
468+
timeout: timeout,
469+
config_entries: config_entries)
464470
end
465471

466472
# Delete a topic in the cluster.

lib/kafka/cluster.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,13 @@ def partitions_for(topic)
156156
raise
157157
end
158158

159-
def create_topic(name, num_partitions:, replication_factor:, timeout:)
159+
def create_topic(name, num_partitions:, replication_factor:, timeout:, config_entries:)
160160
options = {
161161
topics: {
162162
name => {
163163
num_partitions: num_partitions,
164164
replication_factor: replication_factor,
165+
config_entries: config_entries,
165166
}
166167
},
167168
timeout: timeout,

lib/kafka/protocol/create_topics_request.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ def encode(encoder)
2828
encoder.write_array([])
2929

3030
# Config entries. We don't care.
31-
encoder.write_array([])
31+
encoder.write_array(config.fetch(:config_entries)) do |config_name, config_value|
32+
encoder.write_string(config_name)
33+
encoder.write_string(config_value)
34+
end
3235
end
3336

3437
# Timeout is in ms.

spec/functional/topic_management_spec.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,20 @@
1010
expect(partitions).to eq 3
1111
end
1212

13+
example "creating a topic with config entries" do
14+
unless kafka.supports_api?(Kafka::Protocol::DESCRIBE_CONFIGS_API)
15+
skip("This Kafka version not support ")
16+
end
17+
18+
topic = generate_topic_name
19+
expect(kafka.topics).not_to include(topic)
20+
21+
kafka.create_topic(topic, num_partitions: 3, config_entries: { 'cleanup.policy' => 'compact' })
22+
23+
configs = kafka.describe_topic(topic, %w(cleanup.policy))
24+
expect(configs['cleanup.policy']).to eq('compact')
25+
end
26+
1327
example "deleting topics" do
1428
topic = generate_topic_name
1529

spec/spec_helper.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,11 @@ def create_random_topic(*args)
5959
topic
6060
end
6161

62-
def create_topic(name, num_partitions: 1, num_replicas: 1)
63-
kafka.create_topic(name, num_partitions: num_partitions, replication_factor: num_replicas)
62+
def create_topic(name, num_partitions: 1, num_replicas: 1, config_entries: {})
63+
kafka.create_topic(name,
64+
num_partitions: num_partitions,
65+
replication_factor: num_replicas,
66+
config_entries: config_entries)
6467
end
6568
end
6669

0 commit comments

Comments
 (0)