Skip to content
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 7.3.0
- Refactor: logging improvements [#47](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/47)
* integrated MarchHare logging to be part of Logstash's log instead of using std-err
* normalized logging format on (Ruby) errors

## 7.2.0
- Remove ruby pipeline dependency. Starting from Logstash 8, Ruby execution engine is not available. All pipelines should use Java pipeline [#39](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/39)

Expand Down
24 changes: 10 additions & 14 deletions lib/logstash/inputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,9 @@ def run(output_queue)
@output_queue = output_queue
consume!
rescue => e
raise unless stop?
raise(e) unless stop?

logger.warn("Ignoring exception thrown during plugin shutdown",
:message => e.message,
:class => e.class.name,
:location => e.backtrace.first)
@logger.warn("Ignoring exception thrown during plugin shutdown", error_details(e))
end

def setup!
Expand All @@ -200,10 +197,7 @@ def setup!

reset!

@logger.warn("Error while setting up connection for rabbitmq input! Will retry.",
:message => e.message,
:class => e.class.name,
:location => e.backtrace.first)
@logger.warn("Error while setting up connection, will retry", error_details(e))
sleep_for_retry
retry
end
Expand All @@ -215,7 +209,7 @@ def setup!
def reset!
@hare_info.connection && @hare_info.connection.close
rescue => e
@logger.debug("Exception while resetting connection", :exception => e.message, :backtrace => e.backtrace)
@logger.debug("Exception while resetting connection", error_details(e))
ensure
@hare_info = nil
end
Expand Down Expand Up @@ -250,8 +244,8 @@ def consume!

begin
@hare_info.queue.subscribe_with(@consumer, :manual_ack => @ack)
rescue MarchHare::Exception => e
@logger.warn("Could not subscribe to queue! Will retry in #{@subscription_retry_interval_seconds} seconds", :queue => @queue)
rescue => e
@logger.warn("Could not subscribe to queue, will retry in #{@subscription_retry_interval_seconds} seconds", error_details(e, :queue => @queue))

sleep @subscription_retry_interval_seconds
retry
Expand Down Expand Up @@ -316,15 +310,17 @@ def stop
def shutdown_consumer
return unless @consumer
@hare_info.channel.basic_cancel(@consumer.consumer_tag)
connection = @hare_info.connection
until @consumer.terminated?
@logger.info("Waiting for rabbitmq consumer to terminate before stopping!", :params => self.params)
@logger.info("Waiting for RabbitMQ consumer to terminate before stopping", url: connection_url(connection))
sleep 1
end
end

def on_cancellation
if !stop? # If this isn't already part of a regular stop
@logger.info("Received basic.cancel from #{rabbitmq_settings[:host]}, shutting down.")
connection = @hare_info.connection
@logger.info("Received cancellation, shutting down", url: connection_url(connection))
stop
end
end
Expand Down
5 changes: 1 addition & 4 deletions lib/logstash/outputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,7 @@ def publish(event, message)
local_exchange.publish(message, :routing_key => routing_key, :properties => message_properties)
end
rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e
@logger.error("Error while publishing. Will retry.",
:message => e.message,
:exception => e.class,
:backtrace => e.backtrace)
@logger.error("Error while publishing, will retry", error_details(e, backtrace: true))

sleep_for_retry
retry
Expand Down
110 changes: 67 additions & 43 deletions lib/logstash/plugin_mixins/rabbitmq_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ def rabbitmq_settings
s = {
:vhost => @vhost,
:addresses => addresses_from_hosts_and_port(@host, @port),
:user => @user,
:username => @user,
:automatic_recovery => @automatic_recovery,
:pass => @password ? @password.value : "guest",
:password => @password ? @password.value : "guest",
}

s[:timeout] = @connection_timeout || 0
s[:heartbeat] = @heartbeat || 0
s[:connection_timeout] = @connection_timeout || 0
s[:requested_heartbeat] = @heartbeat || 0

if @ssl
s[:tls] = @ssl_version
Expand All @@ -142,7 +142,7 @@ def addresses_from_hosts_and_port(hosts, port)
def connect!
@hare_info = connect() unless @hare_info # Don't duplicate the conn!
rescue MarchHare::Exception, java.io.IOException => e
error_message = if e.message.empty? && e.is_a?(java.io.IOException)
message = if e.message.empty? && e.is_a?(java.io.IOException)
# IOException with an empty message is probably an instance of
# these problems:
# https://github.com/logstash-plugins/logstash-output-rabbitmq/issues/52
Expand All @@ -151,21 +151,12 @@ def connect!
# Best guess is to help the user understand that there is probably
# some kind of configuration problem causing the error, but we
# can't really offer any more detailed hints :\
"An unknown error occurred. RabbitMQ gave no hints as to the cause. Maybe this is a configuration error (invalid vhost, for example). I recommend checking the RabbitMQ server logs for clues about this failure."
"An unknown RabbitMQ error occurred, maybe this is a configuration error (invalid vhost, for example) - please check the RabbitMQ server logs for clues about this failure"
else
e.message
"RabbitMQ connection error, will retry"
end

if @logger.debug?
@logger.error("RabbitMQ connection error, will retry.",
:error_message => error_message,
:exception => e.class.name,
:backtrace => e.backtrace)
else
@logger.error("RabbitMQ connection error, will retry.",
:error_message => error_message,
:exception => e.class.name)
end
@logger.error(message, error_details(e))

sleep_for_retry
retry
Expand All @@ -179,48 +170,43 @@ def connection_open?
@hare_info && @hare_info.connection && @hare_info.connection.open?
end

def connected?
return nil unless @hare_info && @hare_info.connection
@hare_info.connection.connected?
end

private

def declare_exchange!(channel, exchange, exchange_type, durable)
@logger.debug? && @logger.debug("Declaring an exchange", :name => exchange,
:type => exchange_type, :durable => durable)
exchange = channel.exchange(exchange, :type => exchange_type.to_sym, :durable => durable)
@logger.debug? && @logger.debug("Exchange declared")
exchange
rescue StandardError => e
@logger.error("Could not declare exchange!",
:exchange => exchange, :type => exchange_type,
:durable => durable, :error_class => e.class.name,
:error_message => e.message, :backtrace => e.backtrace)
@logger.debug? && @logger.debug("Declaring an exchange", :name => exchange, :type => exchange_type, :durable => durable)
channel.exchange(exchange, :type => exchange_type.to_sym, :durable => durable)
rescue => e
@logger.error("Could not declare exchange", error_details(e, :exchange => exchange, :type => exchange_type, :durable => durable))

raise e
end

def connect
@logger.debug? && @logger.debug("Connecting to RabbitMQ. Settings: #{rabbitmq_settings.inspect}")

connection = MarchHare.connect(rabbitmq_settings)
@logger.debug? && @logger.debug("Connecting to RabbitMQ", rabbitmq_settings)

# disable MarchHare's attempt to provide a "better" exception logging experience:
settings = rabbitmq_settings.merge :exception_handler => com.rabbitmq.client.impl.ForgivingExceptionHandler.new
connection = MarchHare.connect(settings) # MarchHare::Session.connect
# we could pass down the :logger => logger but that adds an extra:
# `logger.info("Using TLS/SSL version #{tls}")` which isn't useful
# the rest of MH::Session logging is mostly debug level details
#
# NOTE: effectively redirects MarchHare's default std-out logging to LS
# (MARCH_HARE_LOG_LEVEL=debug no longer has an effect)
connection.instance_variable_set(:@logger, LoggerAdapter.new(logger))
Copy link
Contributor Author

@kares kares Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not pass :logger => logger to MarchHare directly to avoid info logging a (less-relevant) message.


connection.on_shutdown do |conn, cause|
@logger.warn("RabbitMQ connection was closed!",
:url => connection_url(conn),
:automatic_recovery => @automatic_recovery,
:cause => cause)
@logger.warn("RabbitMQ connection was closed", url: connection_url(conn), automatic_recovery: @automatic_recovery, cause: cause)
end
connection.on_blocked do
@logger.warn("RabbitMQ connection blocked! Check your RabbitMQ instance!",
:url => connection_url(connection))
@logger.warn("RabbitMQ connection blocked - please check the RabbitMQ server logs", url: connection_url(connection))
end
connection.on_unblocked do
@logger.warn("RabbitMQ connection unblocked!", :url => connection_url(connection))
@logger.warn("RabbitMQ connection unblocked", url: connection_url(connection))
end

channel = connection.create_channel
@logger.info("Connected to RabbitMQ at #{rabbitmq_settings[:host]}")
@logger.info("Connected to RabbitMQ", url: connection_url(connection))

HareInfo.new(connection, channel)
end
Expand All @@ -235,6 +221,44 @@ def connection_url(connection)
def sleep_for_retry
Stud.stoppable_sleep(@connect_retry_interval) { @rabbitmq_connection_stopping }
end

def error_details(e, info = {})
details = info.merge(:exception => e.class, :message => e.message)
if e.is_a?(MarchHare::Exception) && e.cause
details[:cause] = e.cause # likely a Java exception
end
details[:backtrace] = e.backtrace if @logger.debug? || info[:backtrace] == true
details
end

# @private adapting MarchHare's Ruby Logger assumptions
class LoggerAdapter < SimpleDelegator

java_import java.lang.Throwable

[:trace, :debug, :info, :warn, :error, :fatal].each do |level|
# sample logging used by MarchHare that we're after:
#
# rescue Exception => e
# logger.error("Caught exception when recovering queue #{q.name}")
# logger.error(e)
# end
class_eval <<-RUBY, __FILE__, __LINE__
def #{level}(arg)
if arg.is_a?(Exception) || arg.is_a?(Throwable)
details = { :exception => arg.class }
details[:cause] = arg.cause if arg.cause
details[:backtrace] = arg.backtrace
__getobj__.#{level}(arg.message.to_s, details)
else
__getobj__.#{level}(arg) # String
end
end
RUBY
end

end

end
end
end
2 changes: 1 addition & 1 deletion logstash-integration-rabbitmq.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-rabbitmq'
s.version = '7.2.0'
s.version = '7.3.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with RabbitMQ - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
36 changes: 33 additions & 3 deletions spec/inputs/rabbitmq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
allow(connection).to receive(:on_blocked)
allow(connection).to receive(:on_unblocked)
allow(connection).to receive(:close)
allow(connection).to receive(:host).and_return host
allow(connection).to receive(:port).and_return port
allow(connection).to receive(:vhost).and_return nil
allow(connection).to receive(:user).and_return 'guest'
allow(channel).to receive(:exchange).and_return(exchange)
allow(channel).to receive(:queue).and_return(queue)
allow(channel).to receive(:prefetch=)
Expand Down Expand Up @@ -335,7 +339,7 @@ def spawn_and_wait(instance)
}

20.times do
instance.connected? ? break : sleep(0.1)
instance.send(:connection_open?) ? break : sleep(0.1)
end

# Extra time to make sure the consumer can attach
Expand Down Expand Up @@ -364,12 +368,12 @@ def spawn_and_wait(instance)

context "using defaults" do
it "should start, connect, and stop cleanly" do
expect(instance.connected?).to be_truthy
expect(instance.send(:connection_open?)).to be_truthy
end
end

it "should have the correct prefetch value" do
expect(instance.instance_variable_get(:@hare_info).channel.prefetch).to eql(256)
expect(hare_info.channel.prefetch).to eql(256)
end

describe "receiving a message with a queue + exchange specified" do
Expand Down Expand Up @@ -461,6 +465,32 @@ def spawn_and_wait(instance)
end
end

context "(MarchHare) error logging" do

let(:error) do
MarchHare::Exception.new('TEST ERROR').tap do |error|
allow( error ).to receive(:cause).and_return(error_cause)
end
end
let(:error_cause) { java.io.IOException.new('TEST CAUSE') }
let(:logger) { instance.logger }

before do
queues = hare_info.channel.instance_variable_get(:@queues)
expect( queue = queues.values.first ).to_not be nil
# emulate an issue during recovery (to trigger logger.error calls)
allow( queue ).to receive(:recover_from_network_failure).and_raise(error)
allow( logger ).to receive(:error)
end

it "gets redirected to plugin logger" do
hare_info.channel.recover_queues
expect( logger ).to have_received(:error).with(/Caught exception when recovering queue/i)
expect( logger ).to have_received(:error).with('TEST ERROR', hash_including(exception: MarchHare::Exception, cause: error_cause))
end

end

describe LogStash::Inputs::RabbitMQ do
require "logstash/devutils/rspec/shared_examples"
it_behaves_like "an interruptible input plugin"
Expand Down
10 changes: 7 additions & 3 deletions spec/outputs/rabbitmq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
allow(connection).to receive(:on_shutdown)
allow(connection).to receive(:on_recovery_start)
allow(connection).to receive(:on_recovery)
allow(connection).to receive(:host).and_return host
allow(connection).to receive(:port).and_return port
allow(connection).to receive(:vhost).and_return nil
allow(connection).to receive(:user).and_return 'guest'
allow(channel).to receive(:exchange).and_return(exchange)

instance.register
Expand Down Expand Up @@ -206,7 +210,7 @@ def spawn_and_wait(instance)
instance.register

20.times do
instance.connected? ? break : sleep(0.1)
instance.send(:connection_open?) ? break : sleep(0.1)
end

# Extra time to make sure the output can attach
Expand Down Expand Up @@ -238,12 +242,12 @@ def spawn_and_wait(instance)

context "using defaults" do
it "should start, connect, and stop cleanly" do
expect(instance.connected?).to be_truthy
expect(instance.send(:connection_open?)).to be_truthy
end

it "should close cleanly" do
instance.close
expect(instance.connected?).to be_falsey
expect(instance.send(:connection_open?)).to be_falsey
end

it 'applies per message settings' do
Expand Down
8 changes: 6 additions & 2 deletions spec/plugin_mixins/rabbitmq_connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ def register
"ssl_certificate_password" => "123"}) }

it "should set the timeout to the expected value" do
expect(instance.rabbitmq_settings[:timeout]).to eql(rabbitmq_settings["connection_timeout"])
expect(instance.rabbitmq_settings[:connection_timeout]).to eql(rabbitmq_settings["connection_timeout"])
end

it "should set heartbeat to the expected value" do
expect(instance.rabbitmq_settings[:heartbeat]).to eql(rabbitmq_settings["heartbeat"])
expect(instance.rabbitmq_settings[:requested_heartbeat]).to eql(rabbitmq_settings["heartbeat"])
end

it "should set tls to the expected value" do
Expand Down Expand Up @@ -129,6 +129,10 @@ def register
allow(connection).to receive(:on_blocked)
allow(connection).to receive(:on_unblocked)
allow(connection).to receive(:on_shutdown)
allow(connection).to receive(:host).and_return host
allow(connection).to receive(:port).and_return port
allow(connection).to receive(:vhost).and_return nil
allow(connection).to receive(:user).and_return 'guest'

instance.register
end
Expand Down