Class: Kafka::ConsumerGroup::Assignor

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/consumer_group/assignor.rb

Overview

A consumer group partition assignor

Defined Under Namespace

Classes: Partition

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, strategy:) ⇒ Assignor

Returns a new instance of Assignor.

Parameters:

  • cluster (Kafka::Cluster)
  • strategy (Object)

    an object that implements #protocol_type,

    user_data, and #assign.

 15 16 17 18
# File 'lib/kafka/consumer_group/assignor.rb', line 15 def initialize(cluster:, strategy:) @cluster = cluster @strategy = strategy end

Instance Method Details

#assign(members:, topics:) ⇒ Hash<String, Kafka::Protocol::MemberAssignment>

Assign the topic partitions to the group members.

Parameters:

Returns:

 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
# File 'lib/kafka/consumer_group/assignor.rb', line 35 def assign(members:, topics:) topic_partitions = topics.flat_map do |topic| begin partition_ids = @cluster.partitions_for(topic).map(&:partition_id) rescue UnknownTopicOrPartition raise UnknownTopicOrPartition, "unknown topic #{topic}" end partition_ids.map {|partition_id| Partition.new(topic, partition_id) } end group_assignment = {} members.each_key do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions| Array(partitions).each do |partition| group_assignment[member_id].assign(partition.topic, [partition.partition_id]) end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end

#protocol_nameObject

 20 21 22
# File 'lib/kafka/consumer_group/assignor.rb', line 20 def protocol_name @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s end

#user_dataObject

 24 25 26
# File 'lib/kafka/consumer_group/assignor.rb', line 24 def user_data @strategy.user_data if @strategy.respond_to?(:user_data) end