Class: Kafka::Protocol::MetadataResponse

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/metadata_response.rb

Overview

A response to a MetadataRequest.

The response contains information on the brokers, topics, and partitions in the cluster.

  • For each broker a node id, host, and port is provided.
  • For each topic partition the node id of the broker acting as partition leader, as well as a list of node ids for the set of replicas, are given. The isr list is the subset of replicas that are "in sync", i.e. have fully caught up with the leader.

API Specification

MetadataResponse => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32] 

Defined Under Namespace

Classes: PartitionMetadata, TopicMetadata

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(brokers:, controller_id:, topics:) ⇒ MetadataResponse

Returns a new instance of MetadataResponse.

 75 76 77 78 79
# File 'lib/kafka/protocol/metadata_response.rb', line 75 def initialize(brokers:, controller_id:, topics:) @brokers = brokers @controller_id = controller_id @topics = topics end

Instance Attribute Details

#brokersArray<Kafka::BrokerInfo> (readonly)

Returns the list of brokers in the cluster.

Returns:

 67 68 69
# File 'lib/kafka/protocol/metadata_response.rb', line 67 def brokers @brokers end

#controller_idInteger (readonly)

Returns The broker id of the controller broker.

Returns:

  • (Integer)

    The broker id of the controller broker.

 73 74 75
# File 'lib/kafka/protocol/metadata_response.rb', line 73 def controller_id @controller_id end

#topicsArray<TopicMetadata> (readonly)

Returns the list of topics in the cluster.

Returns:

 70 71 72
# File 'lib/kafka/protocol/metadata_response.rb', line 70 def topics @topics end

Class Method Details

.decode(decoder) ⇒ MetadataResponse

Decodes a MetadataResponse from a Decoder containing response data.

Parameters:

Returns:

 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
# File 'lib/kafka/protocol/metadata_response.rb', line 143 def self.decode(decoder) brokers = decoder.array do node_id = decoder.int32 host = decoder.string port = decoder.int32 _rack = decoder.string BrokerInfo.new( node_id: node_id, host: host, port: port ) end controller_id = decoder.int32 topics = decoder.array do topic_error_code = decoder.int16 topic_name = decoder.string _is_internal = decoder.boolean partitions = decoder.array do PartitionMetadata.new( partition_error_code: decoder.int16, partition_id: decoder.int32, leader: decoder.int32, replicas: decoder.array { decoder.int32 }, isr: decoder.array { decoder.int32 }, ) end TopicMetadata.new( topic_error_code: topic_error_code, topic_name: topic_name, partitions: partitions, ) end new(brokers: brokers, controller_id: controller_id, topics: topics) end

Instance Method Details

#controller_brokerObject

 123 124 125
# File 'lib/kafka/protocol/metadata_response.rb', line 123 def controller_broker find_broker(controller_id) end

#find_broker(node_id) ⇒ Kafka::BrokerInfo

Finds the broker info for the given node id.

Parameters:

  • node_id (Integer)

    the node id of the broker.

Returns:

Raises:

 115 116 117 118 119 120 121
# File 'lib/kafka/protocol/metadata_response.rb', line 115 def find_broker(node_id) broker = @brokers.find {|b| b.node_id == node_id } raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil? broker end

#find_leader_id(topic, partition) ⇒ Integer

Finds the node id of the broker that is acting as leader for the given topic and partition per this metadata.

Parameters:

  • topic (String)

    the name of the topic.

  • partition (Integer)

    the partition number.

Returns:

  • (Integer)

    the node id of the leader.

 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
# File 'lib/kafka/protocol/metadata_response.rb', line 87 def find_leader_id(topic, partition) topic_info = @topics.find {|t| t.topic_name == topic } if topic_info.nil? raise UnknownTopicOrPartition, "no topic #{topic}" end Protocol.handle_error(topic_info.topic_error_code) partition_info = topic_info.partitions.find {|p| p.partition_id == partition } if partition_info.nil? raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}" end begin Protocol.handle_error(partition_info.partition_error_code) rescue ReplicaNotAvailable # This error can be safely ignored per the protocol specification.  end partition_info.leader end

#partitions_for(topic_name) ⇒ Object

 127 128 129 130 131 132 133 134 135 136 137
# File 'lib/kafka/protocol/metadata_response.rb', line 127 def partitions_for(topic_name) topic = @topics.find {|t| t.topic_name == topic_name } if topic.nil? raise UnknownTopicOrPartition, "unknown topic #{topic_name}" end Protocol.handle_error(topic.topic_error_code) topic.partitions end