Skip to content

Commit 68a1982

Browse files
authored
Merge pull request #30 from yaauie/data_timeout-fixes
data_timeout: decouple blocking duration from data timeout
2 parents cadcdcf + dcd33f2 commit 68a1982

File tree

3 files changed

+42
-11
lines changed

3 files changed

+42
-11
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.1.2
2+
- Fix: eliminate high CPU usage when data timeout is disabled and no data is available on the socket [#30](https://github.com/logstash-plugins/logstash-input-unix/pull/30)
3+
14
## 3.1.1
25
- Fix: unable to stop plugin (on LS 6.x) [#29](https://github.com/logstash-plugins/logstash-input-unix/pull/29)
36
- Refactor: plugin internals got reviewed for `data_timeout => ...` to work reliably

lib/logstash/inputs/unix.rb

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,16 +84,15 @@ def handle_socket(socket, output_queue)
8484
begin
8585
hostname = Socket.gethostname
8686
while !stop?
87-
data = socket.read_nonblock(16384, exception: false)
88-
89-
if data == :wait_readable
90-
if @data_timeout == -1 || IO.select([socket], nil, nil, @data_timeout)
91-
next # retry socket read
92-
else
93-
# socket not ready after @data_timeout seconds
94-
@logger.info("Closing connection after read timeout", :path => @path)
95-
return
96-
end
87+
data = io_interruptable_readpartial(socket, 16384, @data_timeout)
88+
89+
if data == :data_timeout
90+
# socket not ready after @data_timeout seconds
91+
@logger.info("Closing connection after read timeout", :path => @path)
92+
return
93+
elsif data == :stopping
94+
@logger.trace("Shutdown in progress", :path => @path)
95+
next # let next loop handle graceful stop
9796
end
9897

9998
@codec.decode(data) do |event|
@@ -118,6 +117,35 @@ def handle_socket(socket, output_queue)
118117
end
119118
end
120119

120+
##
121+
# Emulates `IO#readpartial` with a timeout and our plugin's stop-condition,
122+
# limiting blocking calls to windows of 10s or less to ensure it can be interrupted.
123+
#
124+
# @param readable_io [IO] the IO to read from
125+
# @param maxlen [Integer] the max bytes to be read
126+
# @param timeout [Number] the maximum number of seconds to , or -1 to disable timeouts
127+
#
128+
# @return [:data_timeout] if timeout was reached before bytes were available
129+
# @return [:stopping] if plugin stop-condition was detected before bytes were available
130+
# @return [String] a non-empty string if bytes became available before the timeout was reached
131+
def io_interruptable_readpartial(readable_io, maxlen, timeout)
132+
133+
data_timeout_deadline = timeout < 0 ? nil : Time.now + timeout
134+
maximum_blocking_seconds = timeout < 0 || timeout > 10 ? 10 : timeout
135+
136+
loop do
137+
return :stopping if stop?
138+
result = readable_io.read_nonblock(maxlen, exception: false)
139+
140+
return result if result.kind_of?(String)
141+
raise EOFError if result.nil?
142+
143+
return :data_timeout if (data_timeout_deadline && data_timeout_deadline < Time.now)
144+
IO.select([readable_io], nil, nil, maximum_blocking_seconds)
145+
end
146+
end
147+
private :io_interruptable_readpartial
148+
121149
private
122150
def server?
123151
@mode == "server"

logstash-input-unix.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-unix'
4-
s.version = '3.1.1'
4+
s.version = '3.1.2'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads events over a UNIX socket"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)