Skip to content

Conversation

abicky
Copy link
Contributor

@abicky abicky commented Mar 24, 2020

This PR removes an unnecessary reconnection after Kafka::Consumer#stop is called.

@cluster.disconnect closes the connections to all brokers including the coordinator in spite of the fact that @group.leave requires a connection to the coordinator, so call @cluster.disconnect after @group.leave.

You can confirm the behavior by executing the following script:

require "kafka" CLIENT_ID = ENV["CLIENT_ID"] GROUP_ID = ENV["GROUP_ID"] TOPIC = ENV["TOPIC"] $stdout.sync = true logger = Logger.new($stdout) logger.level = Logger::DEBUG kafka = Kafka.new(["localhost:9092"], client_id: CLIENT_ID, logger: logger) consumer = kafka.consumer(group_id: GROUP_ID) consumer.subscribe(TOPIC) Thread.new do # Wait for rebalance to end sleep 5 consumer.stop end consumer.each_message do |message| puts message.topic, message.partition puts message.offset, message.key, message.value end

Before

-- snip -- D, [2020-03-25T05:35:02.636289 #32996] DEBUG -- : [[g] {test: 0, 2, 1}:] Handling fetcher command: stop I, [2020-03-25T05:35:02.636329 #32996] INFO -- : [g] {test: 0, 2, 1}: Fetcher thread exited. I, [2020-03-25T05:35:02.636470 #32996] INFO -- : Disconnecting broker 0 D, [2020-03-25T05:35:02.636522 #32996] DEBUG -- : Closing socket to localhost:9092 I, [2020-03-25T05:35:03.633512 #32996] INFO -- : [[g] {}:] Leaving group `g` D, [2020-03-25T05:35:03.633671 #32996] DEBUG -- : [[g] {}:] [leave_group] Opening connection to localhost:9092 with client id c... D, [2020-03-25T05:35:03.634838 #32996] DEBUG -- : [[g] {}:] [leave_group] Sending leave_group API request 1 to localhost:9092 D, [2020-03-25T05:35:03.635018 #32996] DEBUG -- : [[g] {}:] [leave_group] Waiting for response 1 from localhost:9092 D, [2020-03-25T05:35:03.636950 #32996] DEBUG -- : [[g] {}:] [leave_group] Received response 1 from localhost:9092 

After

-- snip -- D, [2020-03-25T05:36:31.482775 #33053] DEBUG -- : [[g] {test: 0, 2, 1}:] Handling fetcher command: stop I, [2020-03-25T05:36:31.482809 #33053] INFO -- : [g] {test: 0, 2, 1}: Fetcher thread exited. I, [2020-03-25T05:36:32.480502 #33053] INFO -- : [[g] {}:] Leaving group `g` D, [2020-03-25T05:36:32.480612 #33053] DEBUG -- : [[g] {}:] [leave_group] Sending leave_group API request 6 to localhost:9092 D, [2020-03-25T05:36:32.480760 #33053] DEBUG -- : [[g] {}:] [leave_group] Waiting for response 6 from localhost:9092 D, [2020-03-25T05:36:32.482325 #33053] DEBUG -- : [[g] {}:] [leave_group] Received response 6 from localhost:9092 I, [2020-03-25T05:36:32.482380 #33053] INFO -- : [[g] {}:] Disconnecting broker 0 D, [2020-03-25T05:36:32.482403 #33053] DEBUG -- : [[g] {}:] Closing socket to localhost:9092 

As you can see above, the message "Opening connection to localhost:9092" disappeared in the log after the change.

This commit removes an unnecessary reconnection after `Kafka::Consumer#stop` is called. `@cluster.disconnect` closes the connections to all brokers including the coordinator in spite of the fact that `@group.leave` requires a connection to the coordinator, so call `@cluster.disconnect` after `@group.leave`.
@dasch dasch merged commit 8660d05 into zendesk:master Apr 20, 2020
@dasch
Copy link
Contributor

dasch commented Apr 20, 2020

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants