Skip to content
7 changes: 6 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
# @param fetcher_max_queue_size [Integer] max number of items in the fetch queue that
# are stored for further processing. Note, that each item in the queue represents a
# response from a single broker.
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
# If it is 0, the topic list won't be refreshed (default)
# If it is n (n > 0), the topic list will be refreshed every n seconds
# @return [Consumer]
def consumer(
group_id:,
Expand All @@ -345,7 +348,8 @@ def consumer(
offset_commit_threshold: 0,
heartbeat_interval: 10,
offset_retention_time: nil,
fetcher_max_queue_size: 100
fetcher_max_queue_size: 100,
refresh_topic_interval: 0
)
cluster = initialize_cluster

Expand Down Expand Up @@ -399,6 +403,7 @@ def consumer(
fetcher: fetcher,
session_timeout: session_timeout,
heartbeat: heartbeat,
refresh_topic_interval: refresh_topic_interval
)
end

Expand Down
62 changes: 53 additions & 9 deletions lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Kafka
#
class Consumer

def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:)
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
@cluster = cluster
@logger = TaggedLogger.new(logger)
@instrumenter = instrumenter
Expand All @@ -53,6 +53,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
@session_timeout = session_timeout
@fetcher = fetcher
@heartbeat = heartbeat
@refresh_topic_interval = refresh_topic_interval

@pauses = Hash.new {|h, k|
h[k] = Hash.new {|h2, k2|
Expand All @@ -73,6 +74,15 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
# when user commits message other than last in a batch, this would make ruby-kafka refetch
# some already consumed messages
@current_offsets = Hash.new { |h, k| h[k] = {} }

# Map storing subscribed topics with their configuration
@subscribed_topics = Concurrent::Map.new

# Set storing topics that matched topics in @subscribed_topics
@matched_topics = Set.new

# Whether join_group must be executed again because new topics are added
@join_group_for_new_topics = false
end

# Subscribes the consumer to a topic.
Expand All @@ -97,13 +107,12 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
default_offset ||= start_from_beginning ? :earliest : :latest

if topic_or_regex.is_a?(Regexp)
cluster_topics.select { |topic| topic =~ topic_or_regex }.each do |topic|
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
end
else
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
end
@subscribed_topics[topic_or_regex] = {
default_offset: default_offset,
start_from_beginning: start_from_beginning,
max_bytes_per_partition: max_bytes_per_partition
}
scan_for_subscribing

nil
end
Expand Down Expand Up @@ -402,6 +411,7 @@ def consumer_loop
while running?
begin
@instrumenter.instrument("loop.consumer") do
refresh_topic_list_if_enabled
yield
end
rescue HeartbeatError
Expand Down Expand Up @@ -453,6 +463,8 @@ def make_final_offsets_commit!(attempts = 3)
end

def join_group
@join_group_for_new_topics = false

old_generation_id = @group.generation_id

@group.join
Expand Down Expand Up @@ -514,11 +526,19 @@ def resume_paused_partitions!
end
end

def refresh_topic_list_if_enabled
return if @refresh_topic_interval <= 0
return if @refreshed_at && @refreshed_at + @refresh_topic_interval > Time.now

scan_for_subscribing
@refreshed_at = Time.now
end

def fetch_batches
# Return early if the consumer has been stopped.
return [] if shutting_down?

join_group unless @group.member?
join_group if !@group.member? || @join_group_for_new_topics

trigger_heartbeat

Expand Down Expand Up @@ -572,10 +592,34 @@ def clear_current_offsets(excluding: {})
end
end

def scan_for_subscribing
@subscribed_topics.each do |topic_or_regex, config|
default_offset = config.fetch(:default_offset)
start_from_beginning = config.fetch(:start_from_beginning)
max_bytes_per_partition = config.fetch(:max_bytes_per_partition)
if topic_or_regex.is_a?(Regexp)
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
else
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
end
end
end

def subscribe_to_regex(topic_regex, default_offset, start_from_beginning, max_bytes_per_partition)
cluster_topics.select { |topic| topic =~ topic_regex }.each do |topic|
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
end
end

def subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
return if @matched_topics.include?(topic)
@matched_topics.add(topic)
@join_group_for_new_topics = true

@group.subscribe(topic)
@offset_manager.set_default_offset(topic, default_offset)
@fetcher.subscribe(topic, max_bytes_per_partition: max_bytes_per_partition)
@cluster.mark_as_stale!
end

def cluster_topics
Expand Down
6 changes: 6 additions & 0 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@
allow(cluster).to receive(:add_target_topics)
allow(cluster).to receive(:disconnect)
allow(cluster).to receive(:refresh_metadata_if_necessary!)
allow(cluster).to receive(:mark_as_stale!)

allow(offset_manager).to receive(:commit_offsets)
allow(offset_manager).to receive(:commit_offsets_if_necessary)
allow(offset_manager).to receive(:set_default_offset)
allow(offset_manager).to receive(:mark_as_processed)
allow(offset_manager).to receive(:next_offset_for) { 42 }
allow(offset_manager).to receive(:clear_offsets)

allow(group).to receive(:subscribe)
allow(group).to receive(:group_id)
Expand All @@ -151,11 +153,15 @@
allow(group).to receive(:subscribed_partitions) { assigned_partitions }
allow(group).to receive(:assigned_to?) { false }
allow(group).to receive(:assigned_to?).with('greetings', 0) { true }
allow(group).to receive(:generation_id) { 1 }
allow(group).to receive(:join)
allow(group).to receive(:assigned_partitions) { [] }

allow(heartbeat).to receive(:trigger)

allow(fetcher).to receive(:data?) { fetched_batches.any? }
allow(fetcher).to receive(:poll) { [:batches, fetched_batches] }
allow(fetcher).to receive(:reset)

consumer.subscribe("greetings")
end
Expand Down
41 changes: 41 additions & 0 deletions spec/functional/consumer_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,47 @@
expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
end

example "subscribing to multiple topics using regex and enable refreshing the topic list" do
topic_a = generate_topic_name
topic_b = generate_topic_name

messages_a = (1..500).to_a
messages_b = (501..1000).to_a
messages = messages_a + messages_b

producer = Kafka.new(kafka_brokers, client_id: "test").producer

messages_a.each { |i| producer.produce(i.to_s, topic: topic_a) }
producer.deliver_messages

group_id = "test#{rand(1000)}"

received_messages = []

kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, refresh_topic_interval: 1)
consumer.subscribe(/#{topic_a}|#{topic_b}/)

thread = Thread.new do
consumer.each_message do |message|
received_messages << message

if received_messages.count == messages.count
consumer.stop
end
end
end
thread.abort_on_exception = true

sleep 1
messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) }
producer.deliver_messages

thread.join

expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
end

example "consuming messages from a topic that's being written to" do
num_partitions = 3
topic = create_random_topic(num_partitions: num_partitions)
Expand Down