Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions lib/logstash/inputs/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,49 @@ class LogStash::Inputs::Pipe < LogStash::Inputs::Base
# command => "echo hello world"
config :command, :validate => :string, :required => true

# Should the pipe be restarted when it exits. Valid values are:
# * "always" - restart after every exit of the pipe command
# * "error" - restart only after an erroneous condition of the pipe command
# * "never" - never restart the pipe command
#
# Example:
# [source,ruby]
# restart => "always"
config :restart, :validate => :string, :default => "always", :validate => [ "always", "error", "never" ]


# Number of seconds to wait before restarting the pipe
config :wait_on_restart, :validate => :number, :default => 10

public
def register
@logger.info("Registering pipe input", :command => @command)
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be necessary if we add the magic utf-8 header.

end # def register

public
def run(queue)
loop do
begin
@pipe = IO.popen(@command, mode="r")
hostname = Socket.gethostname

@pipe.each do |line|
IO.popen(@command, mode="r").each do |line|
line = line.chomp
source = "pipe://#{hostname}/#{@command}"
@logger.debug? && @logger.debug("Received line", :command => @command, :line => line)
@codec.decode(line) do |event|
event["host"] = hostname
event["host"] = @host
event["command"] = @command
decorate(event)
queue << event
end
end
break unless @restart == "always"
rescue LogStash::ShutdownSignal => e
break
rescue Exception => e
@logger.error("Exception while running command", :e => e, :backtrace => e.backtrace)
@logger.error("Exception while running command", :command => @command, :e => e, :backtrace => e.backtrace)
break unless @restart == "error" || @restart == "always"
end

# Keep running the command forever.
sleep(10)
# Wait before restarting the pipe.
sleep(@wait_on_restart)
end
end # def run
end # class LogStash::Inputs::Pipe
103 changes: 102 additions & 1 deletion spec/inputs/pipe_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,43 @@
describe "inputs/pipe" do


describe "echo" do
#Minimal value to avoid an endless loop when enabling debug log
@@wait_on_restart = 0.3

describe "echo - once" do
event_count = 1
tmp_file = Tempfile.new('logstash-spec-input-pipe')

config <<-CONFIG
input {
pipe {
command => "echo ☹"
restart => "never"
}
}
CONFIG

input do |pipeline, queue|
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?

events = event_count.times.collect { queue.pop }
event_count.times do |i|
insist { events[i]["message"] } == "☹"
end
end # input
end

describe "echo - forever" do
event_count = 10
tmp_file = Tempfile.new('logstash-spec-input-pipe')

config <<-CONFIG
input {
pipe {
command => "echo ☹"
restart => "always"
wait_on_restart => 0
}
}
CONFIG
Expand All @@ -36,6 +65,7 @@
input {
pipe {
command => "tail -f #{tmp_file.path}"
restart => "never"
}
}
CONFIG
Expand All @@ -57,4 +87,75 @@
end # input
end

#This test is reading the log expecting only warn level,
#so enabling debug logging will make it fail
describe "invalid command - do not restart" do
error_count = 1
config <<-CONFIG
input {
pipe {
command => "@@@Invalid_Command_Test@@@"
restart => "never"
}
}
CONFIG
logger = Cabin::Channel.get(LogStash)
log_messages = Queue.new
logger.subscribe(log_messages)
input do |pipeline, queue|
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?
#No event pushed to the queue
insist { queue.empty?} == true
errors = error_count.times.collect { log_messages.pop }
error_count.times do |i|
insist { errors[i][:message] } == "Exception while running command"
end
#The input should not restart, there is no more error logs
retries = 0
has_more = false
while !has_more && retries < 5 do
begin
log_messages.pop(true)
has_more = true
rescue => e
end
sleep(0.1)
retries += 1
end
if has_more
raise "Input should not restart"
end
end # input
end

#This test is reading the log expecting only warn level,
#so enabling debug logging will make it fail
describe "restart on error" do
error_count = 3
config <<-CONFIG
input {
pipe {
command => "@@@Invalid_Command_Test@@@"
restart => "error"
wait_on_restart => #{@@wait_on_restart}
}
}
CONFIG

logger = Cabin::Channel.get(LogStash)
log_messages = Queue.new
logger.subscribe(log_messages)
input do |pipeline, queue|
Thread.new { pipeline.run }
sleep 0.1 while !pipeline.ready?
#No event pushed to the queue
insist { queue.empty?} == true
errors = error_count.times.collect { log_messages.pop }
error_count.times do |i|
insist { errors[i][:message] } == "Exception while running command"
end
end # input
end

end