Skip to content

Conversation

edmocosta
Copy link
Contributor

@edmocosta edmocosta commented Jun 7, 2023

There are two flows to shutdown the RabbitMQ consumers. When the plugin is the one shutting it down, it should send a channel cancellation message by invoking @hare_info.channel.basic_cancel(@consumer.consumer_tag) and waiting for the consumer to terminate (once the broker replies with a basic.cancel-ok message). This back-and-forth is handled by the MarchHare client. On the other hand, when the broker requests the client to shut down (eg. due to queue deletion). It sends the client a basic.cancel message, which is handled internally by the client (handleCancel), unregistering the consumer and then invoking the :on_cancellation callback. In that case, the plugin should not do anything as the consumer is already canceled/unregistered.

This PR adds an extra condition on the shutdown to check whether the client is already canceled/terminated (by the broker) or if it should execute the plugin cancellation flow.


Closes: #40

@edmocosta edmocosta changed the title Fixed the cancellation flow to avoid multiple invocations of basic.cancel Fix the cancellation flow to avoid multiple invocations of basic.cancel Jun 7, 2023
@andsel andsel self-requested a review June 8, 2023 14:28
@andsel
Copy link

andsel commented Jun 8, 2023

Verified with:

  • run docker RabbitMQ:
docker run -v rabbitmq-data:/var/lib/rabbitmq --hostname my-rabbit --name rabbit_dev -p 15672:15672 -p 5672:5672 rabbitmq:3-management
  • created sysmsg exchange. on RAbbitMQ admin console (guest:guest localhost:15672)
  • used following pipeline config:
input { rabbitmq { codec => plain # codec (optional), default: "plain" exchange => "sysmsg" # string (optional) host => "localhost" # string (required) vhost => "/" user => 'guest' password => 'guest' queue => "iot_queue" durable => true } } output { stdout { codec => rubydebug } } 
  • published a message on the queue to check RabbitMQ-LS connectivity
  • from admin UI removed the queue.

With this PR no errors and LS process terminates (because the only pipeline has terminated), while without I got the error:

[2023-06-08T16:53:53,036][ERROR][com.rabbitmq.client.impl.ForgivingExceptionHandler][main][d54009ece8504966542a93228eaa8e792137c5f427a488689c17a5755449d957] Consumer rubyobj.MarchHare.CallbackConsumer@1a14c353 (amq.ctag-WcPm9PfEm68qkv8lLBXc8w) method handleCancel for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1)threw an exception for channel AMQChannel(amqp://guest@127.0.0.1:5672/,1) java.io.IOException: Unknown consumerTag	at com.rabbitmq.client.impl.ChannelN.basicCancel(com/rabbitmq/client/impl/ChannelN.java:1476) ~[rabbitmq-client.jar:5.16.0]	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:77) ~[?:?]	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43) ~[?:?]	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:568) ~[?:?]	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:427) ~[jruby.jar:?]	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:294) ~[jruby.jar:?]	at org.jruby.RubyClass.finvokeWithRefinements(org/jruby/RubyClass.java:514) ~[jruby.jar:?]	at org.jruby.RubyBasicObject.send(org/jruby/RubyBasicObject.java:1733) ~[jruby.jar:?]	at org.jruby.RubyBasicObject$INVOKER$i$send.call(org/jruby/RubyBasicObject$INVOKER$i$send.gen) ~[jruby.jar:?]	at RUBY.method_missing(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/march_hare-4.5.0-java/lib/march_hare/channel.rb:888) ~[?:?]	at RUBY.shutdown_consumer(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/logstash-integration-rabbitmq-7.3.2-java/lib/logstash/inputs/rabbitmq.rb:312) ~[?:?]	at RUBY.stop(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/logstash-integration-rabbitmq-7.3.2-java/lib/logstash/inputs/rabbitmq.rb:306) ~[?:?]	at RUBY.on_cancellation(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/logstash-integration-rabbitmq-7.3.2-java/lib/logstash/inputs/rabbitmq.rb:324) ~[?:?]	at RUBY.consume!(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/logstash-integration-rabbitmq-7.3.2-java/lib/logstash/inputs/rabbitmq.rb:241) ~[?:?]	at org.jruby.RubyProc.call(org/jruby/RubyProc.java:277) ~[jruby.jar:?]	at org.jruby.RubyProc$INVOKER$i$call.call(org/jruby/RubyProc$INVOKER$i$call.gen) ~[jruby.jar:?]	at RUBY.handleCancel(/Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/march_hare-4.5.0-java/lib/march_hare/consumers/base.rb:37) ~[?:?]	at org.jruby.RubyClass.finvoke(org/jruby/RubyClass.java:784) ~[jruby.jar:?]	at org.jruby.RubyBasicObject.callMethod(org/jruby/RubyBasicObject.java:370) ~[jruby.jar:?]	at rubyobj.MarchHare.BaseConsumer.handleCancel(rubyobj/MarchHare//Users/andrea/workspace/logstash_andsel/vendor/bundle/jruby/2.6.0/gems/march_hare-4.5.0-java/lib/march_hare/consumers/base.rb:30) ~[?:?]	at com.rabbitmq.client.impl.ConsumerDispatcher$3.run(com/rabbitmq/client/impl/ConsumerDispatcher.java:115) [rabbitmq-client.jar:5.16.0]	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(com/rabbitmq/client/impl/ConsumerWorkService.java:111) [rabbitmq-client.jar:5.16.0]	at java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/ThreadPoolExecutor.java:1136) [?:?]	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/ThreadPoolExecutor.java:635) [?:?]	at java.lang.Thread.run(java/lang/Thread.java:833) [?:?] 
Copy link

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@edmocosta edmocosta merged commit 5c94328 into logstash-plugins:main Jun 9, 2023
@edmocosta edmocosta deleted the fix-cancellation-flow branch June 9, 2023 08:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants