11# encoding: utf-8
2+ require "logstash/namespace"
23require "logstash/inputs/base"
34require "logstash/inputs/threadable"
4- require "logstash/namespace"
55
66# This input will read events from a Redis instance; it supports both Redis channels and lists.
77# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
1515# `batch_count` note: If you use the `batch_count` setting, you *must* use a Redis version 2.6.0 or
1616# newer. Anything older does not support the operations used by batching.
1717#
18- class LogStash ::Inputs ::Redis < LogStash ::Inputs ::Threadable
18+ module Logstash module Inputs class Redis < LogStash ::Inputs ::Threadable
19+ # class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
20+
1921 config_name "redis"
2022
2123 default :codec , "json"
@@ -57,14 +59,33 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
5759 config :batch_count , :validate => :number , :default => 1
5860
5961 public
62+ # public API
63+
64+ REDIS_INPUT_POISON_MSG = '}-{ poison message from teardown }-{' . freeze
65+
66+ # use to store a proc that can provide a redis instance or mock
67+ def add_external_redis_builder ( builder ) #callable
68+ @redis_builder = builder
69+ self
70+ end
71+
72+ # use to apply an instance directly and bypass the builder
73+ def use_redis ( instance )
74+ @redis = instance
75+ self
76+ end
77+
78+ def new_redis_instance
79+ @redis_builder . call
80+ end
81+
6082 def register
6183 require 'redis'
62- @redis = nil
6384 @redis_url = "redis://#{ @password } @#{ @host } :#{ @port } /#{ @db } "
6485
6586 # TODO remove after setting key and data_type to true
6687 if @queue
67- if @key or @data_type
88+ if @key || @data_type
6889 raise RuntimeError . new (
6990 "Cannot specify queue parameter and key or data_type"
7091 )
@@ -73,38 +94,79 @@ def register
7394 @data_type = 'list'
7495 end
7596
76- if not @key or not @data_type
97+ if ! @key || ! @data_type
7798 raise RuntimeError . new (
7899 "Must define queue, or key and data_type parameters"
79100 )
80101 end
81102 # end TODO
82103
83- @logger . info ( "Registering Redis" , :identity => identity )
104+ @redis_builder ||= method ( :internal_redis_builder )
105+
106+ # just switch on data_type once
107+ if @data_type == 'list' || @data_type == 'dummy'
108+ @run_method = method ( :list_runner )
109+ @teardown_method = method ( :list_teardown )
110+ elsif @data_type == 'channel'
111+ @run_method = method ( :channel_runner )
112+ @teardown_method = method ( :subscribe_teardown )
113+ elsif @data_type == 'pattern_channel'
114+ @run_method = method ( :pattern_channel_runner )
115+ @teardown_method = method ( :subscribe_teardown )
116+ end
117+
118+ # TODO(sissel, boertje): set @identity directly when @name config option is removed.
119+ @identity = @name != 'default' ? @name : "#{ @redis_url } #{ @data_type } :#{ @key } "
120+ @logger . info ( "Registering Redis" , :identity => @identity )
84121 end # def register
85122
86- # A string used to identify a Redis instance in log messages
87- # TODO(sissel): Use instance variables for this once the @name config
88- # option is removed.
89- private
90- def identity
91- @name || "#{ @redis_url } #{ @data_type } :#{ @key } "
123+ def run ( output_queue )
124+ @run_method . call ( output_queue )
125+ rescue LogStash ::ShutdownSignal
126+ # ignore and quit
127+ end # def run
128+
129+ def teardown
130+ @shutdown_requested = true
131+ @teardown_method . call
92132 end
93133
134+ # private methods -----------------------------
94135 private
95- def connect
96- redis = Redis . new (
136+
137+ def batched?
138+ @batch_count > 1
139+ end
140+
141+ # private
142+ def is_list_type?
143+ @data_type == 'list'
144+ end
145+
146+ # private
147+ def redis_params
148+ {
97149 :host => @host ,
98150 :port => @port ,
99151 :timeout => @timeout ,
100152 :db => @db ,
101153 :password => @password . nil? ? nil : @password . value
102- )
103- load_batch_script ( redis ) if @data_type == 'list' && ( @batch_count > 1 )
104- return redis
154+ }
155+ end
156+
157+ # private
158+ def internal_redis_builder
159+ ::Redis . new ( redis_params )
160+ end
161+
162+ # private
163+ def connect
164+ redis = new_redis_instance
165+ load_batch_script ( redis ) if batched? && is_list_type?
166+ redis
105167 end # def connect
106168
107- private
169+ # private
108170 def load_batch_script ( redis )
109171 #A Redis Lua EVAL script to fetch a count of keys
110172 #in case count is bigger than current items in queue whole queue will be returned without extra nil values
@@ -126,7 +188,7 @@ def load_batch_script(redis)
126188 @redis_script_sha = redis . script ( :load , redis_script )
127189 end
128190
129- private
191+ # private
130192 def queue_event ( msg , output_queue )
131193 begin
132194 @codec . decode ( msg ) do |event |
@@ -141,18 +203,51 @@ def queue_event(msg, output_queue)
141203 end
142204 end
143205
144- private
206+ # private
207+ def shutting_down?
208+ @shutdown_requested
209+ end
210+
211+ # private
212+ def running?
213+ !@shutdown_requested
214+ end
215+
216+ # private
217+ def list_teardown
218+ return if @redis . nil? || !@redis . connected?
219+
220+ @redis . quit rescue nil
221+ @redis = nil
222+ end
223+
224+ # private
225+ def list_runner ( output_queue )
226+ while running?
227+ begin
228+ @redis ||= connect
229+ list_listener ( @redis , output_queue )
230+ rescue ::Redis ::BaseError => e
231+ @logger . warn ( "Redis connection problem" , :exception => e )
232+ # Reset the redis variable to trigger reconnect
233+ @redis = nil
234+ sleep 1
235+ end
236+ end
237+ end
238+
239+ # private
145240 def list_listener ( redis , output_queue )
146241
147242 item = redis . blpop ( @key , 0 , :timeout => 1 )
148243 return unless item # from timeout or other conditions
149244
150245 # blpop returns the 'key' read from as well as the item result
151246 # we only care about the result (2nd item in the list).
152- queue_event ( item [ 1 ] , output_queue )
247+ queue_event ( item . last , output_queue )
153248
154249 # If @batch_count is 1, there's no need to continue.
155- return if @batch_count == 1
250+ return if ! batched?
156251
157252 begin
158253 redis . evalsha ( @redis_script_sha , [ @key ] , [ @batch_count -1 ] ) . each do |item |
@@ -173,7 +268,7 @@ def list_listener(redis, output_queue)
173268 #queue_event(item, output_queue) if item
174269 #end
175270 # --- End commented out implementation of 'batch fetch'
176- rescue Redis ::CommandError => e
271+ rescue :: Redis ::CommandError => e
177272 if e . to_s =~ /NOSCRIPT/ then
178273 @logger . warn ( "Redis may have been restarted, reloading Redis batch EVAL script" , :exception => e ) ;
179274 load_batch_script ( redis )
@@ -184,15 +279,64 @@ def list_listener(redis, output_queue)
184279 end
185280 end
186281
187- private
188- def channel_listener ( redis , output_queue )
189- redis . subscribe @key do |on |
282+ # private
283+ def subscribe_teardown
284+ return if @redis . nil? || !@redis . connected?
285+
286+ # the underlying client in redis is blocked on subscribe
287+ # need new redis instance, send poison message, causes
288+ # an unsubscribe
289+ publish_poison_message
290+
291+ # if its a SubscribedClient then:
292+ # a) the poison_message did not work
293+ # b) it does not have a disconnect method (yet)
294+ if @redis . client . is_a? ( ::Redis ::Client )
295+ @redis . client . disconnect
296+ end
297+ @redis = nil
298+ end
299+
300+ # private
301+ def publish_poison_message
302+ new_redis_instance . publish ( @key , REDIS_INPUT_POISON_MSG )
303+ end
304+
305+ # private
306+ def redis_runner
307+ begin
308+ @redis ||= connect
309+ yield
310+ rescue ::Redis ::BaseError => e
311+ @logger . warn ( "Redis connection problem" , :exception => e )
312+ # Reset the redis variable to trigger reconnect
313+ @redis = nil
314+ sleep 1
315+ retry
316+ end
317+ end
318+
319+ # private
320+ def channel_runner ( output_queue )
321+ redis_runner do
322+ channel_listener ( output_queue )
323+ end
324+ end
325+
326+ # private
327+ def channel_listener ( output_queue )
328+ @redis . subscribe ( @key ) do |on |
190329 on . subscribe do |channel , count |
191330 @logger . info ( "Subscribed" , :channel => channel , :count => count )
192331 end
193332
194333 on . message do |channel , message |
195- queue_event message , output_queue
334+ if message != REDIS_INPUT_POISON_MSG
335+ queue_event ( message , output_queue )
336+ end
337+ if shutting_down?
338+ @redis . unsubscribe ( @key )
339+ end
196340 end
197341
198342 on . unsubscribe do |channel , count |
@@ -201,15 +345,26 @@ def channel_listener(redis, output_queue)
201345 end
202346 end
203347
204- private
205- def pattern_channel_listener ( redis , output_queue )
206- redis . psubscribe @key do |on |
348+ def pattern_channel_runner ( output_queue )
349+ redis_runner do
350+ pattern_channel_listener ( output_queue )
351+ end
352+ end
353+
354+ # private
355+ def pattern_channel_listener ( output_queue )
356+ @redis . psubscribe @key do |on |
207357 on . psubscribe do |channel , count |
208358 @logger . info ( "Subscribed" , :channel => channel , :count => count )
209359 end
210360
211- on . pmessage do |ch , event , message |
212- queue_event message , output_queue
361+ on . pmessage do |pattern , channel , message |
362+ if message != REDIS_INPUT_POISON_MSG
363+ queue_event ( message , output_queue )
364+ end
365+ if shutting_down?
366+ @redis . punsubscribe ( @key )
367+ end
213368 end
214369
215370 on . punsubscribe do |channel , count |
@@ -218,51 +373,6 @@ def pattern_channel_listener(redis, output_queue)
218373 end
219374 end
220375
221- # Since both listeners have the same basic loop, we've abstracted the outer
222- # loop.
223- private
224- def listener_loop ( listener , output_queue )
225- while !@shutdown_requested
226- begin
227- @redis ||= connect
228- self . send listener , @redis , output_queue
229- rescue Redis ::BaseError => e
230- @logger . warn ( "Redis connection problem" , :exception => e )
231- # Reset the redis variable to trigger reconnect
232- @redis = nil
233- sleep 1
234- end
235- end
236- end # listener_loop
376+ # end
237377
238- public
239- def run ( output_queue )
240- if @data_type == 'list'
241- listener_loop :list_listener , output_queue
242- elsif @data_type == 'channel'
243- listener_loop :channel_listener , output_queue
244- else
245- listener_loop :pattern_channel_listener , output_queue
246- end
247- rescue LogStash ::ShutdownSignal
248- # ignore and quit
249- end # def run
250-
251- public
252- def teardown
253- @shutdown_requested = true
254-
255- if @redis
256- if @data_type == 'list'
257- @redis . quit rescue nil
258- elsif @data_type == 'channel'
259- @redis . unsubscribe rescue nil
260- @redis . connection . disconnect
261- elsif @data_type == 'pattern_channel'
262- @redis . punsubscribe rescue nil
263- @redis . connection . disconnect
264- end
265- @redis = nil
266- end
267- end
268- end # class LogStash::Inputs::Redis
378+ end end end # Redis Inputs LogStash
0 commit comments