Skip to content

Commit 2266dc7

Browse files
committed
Clean up topic create configs
1 parent bb61cf2 commit 2266dc7

File tree

5 files changed

+26
-19
lines changed

5 files changed

+26
-19
lines changed

lib/kafka/client.rb

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -451,22 +451,32 @@ def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes
451451

452452
# Creates a topic in the cluster.
453453
#
454+
# @example Creating a topic with log compaction
455+
# # Enable log compaction:
456+
# config = { "cleanup.policy" => "compact" }
457+
#
458+
# # Create the topic:
459+
# kafka.create_topic("dns-mappings", config: config)
460+
#
454461
# @param name [String] the name of the topic.
455462
# @param num_partitions [Integer] the number of partitions that should be created
456463
# in the topic.
457464
# @param replication_factor [Integer] the replication factor of the topic.
458465
# @param timeout [Integer] a duration of time to wait for the topic to be
459466
# completely created.
460-
# @param config_entries [Hash] topic-level configs to use for the topic.
461-
# See https://kafka.apache.org/documentation/#topicconfigs.
467+
# @param config [Hash] topic configuration entries. See
468+
# [the Kafka documentation](https://kafka.apache.org/documentation/#topicconfigs)
469+
# for more information.
462470
# @raise [Kafka::TopicAlreadyExists] if the topic already exists.
463471
# @return [nil]
464-
def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30, config_entries: {})
465-
@cluster.create_topic(name,
466-
num_partitions: num_partitions,
467-
replication_factor: replication_factor,
468-
timeout: timeout,
469-
config_entries: config_entries)
472+
def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30, config: {})
473+
@cluster.create_topic(
474+
name,
475+
num_partitions: num_partitions,
476+
replication_factor: replication_factor,
477+
timeout: timeout,
478+
config: config,
479+
)
470480
end
471481

472482
# Delete a topic in the cluster.

lib/kafka/cluster.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,13 @@ def partitions_for(topic)
156156
raise
157157
end
158158

159-
def create_topic(name, num_partitions:, replication_factor:, timeout:, config_entries:)
159+
def create_topic(name, num_partitions:, replication_factor:, timeout:, config:)
160160
options = {
161161
topics: {
162162
name => {
163163
num_partitions: num_partitions,
164164
replication_factor: replication_factor,
165-
config_entries: config_entries,
165+
config: config,
166166
}
167167
},
168168
timeout: timeout,

lib/kafka/protocol/create_topics_request.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def encode(encoder)
2828
encoder.write_array([])
2929

3030
# Config entries. We don't care.
31-
encoder.write_array(config.fetch(:config_entries)) do |config_name, config_value|
31+
encoder.write_array(config.fetch(:config)) do |config_name, config_value|
3232
encoder.write_string(config_name)
3333
encoder.write_string(config_value)
3434
end

spec/functional/topic_management_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
topic = generate_topic_name
1919
expect(kafka.topics).not_to include(topic)
2020

21-
kafka.create_topic(topic, num_partitions: 3, config_entries: { 'cleanup.policy' => 'compact' })
21+
kafka.create_topic(topic, num_partitions: 3, config: { "cleanup.policy" => "compact" })
2222

23-
configs = kafka.describe_topic(topic, %w(cleanup.policy))
24-
expect(configs['cleanup.policy']).to eq('compact')
23+
configs = kafka.describe_topic(topic, ["cleanup.policy"])
24+
expect(configs.fetch("cleanup.policy")).to eq("compact")
2525
end
2626

2727
example "deleting topics" do

spec/spec_helper.rb

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,8 @@ def create_random_topic(*args)
5959
topic
6060
end
6161

62-
def create_topic(name, num_partitions: 1, num_replicas: 1, config_entries: {})
63-
kafka.create_topic(name,
64-
num_partitions: num_partitions,
65-
replication_factor: num_replicas,
66-
config_entries: config_entries)
62+
def create_topic(name, num_partitions: 1, num_replicas: 1)
63+
kafka.create_topic(name, num_partitions: num_partitions, replication_factor: num_replicas)
6764
end
6865
end
6966

0 commit comments

Comments
 (0)